Skip to content

Commit

Permalink
WProperty initializers (#73)
Browse files Browse the repository at this point in the history
* add 2nd cmd to grpc

* updating nugets

* rm ns2 if debug

* rm dupe task timeout

* do not install 3.1

* code analysis clean

* finishing code analysis

* clean mmemon csproj

* start memmon2

* empty commands

* empty commands

* Add empty commands to hub binders

* Add empty commands to hub binders

* added primes and malloc/free

* tryDeserialize

* clean retained

* remove from bytes

* start memmon2

* added primes and malloc/free

* clean retained

* remove from bytes

* 2nd telemetry

* ref init props

* GetTwin discard sub

* hub requires # to subscribe to commands

* ref init properties

* right GC stats

* exclude gettwin test

* exclude gettwin test

* clean Twin Initializer

* split subs in c2d binder

* memmon factory as service

* upd model

* clean warnings

* use pi-sense factory

* delete unused code

Co-authored-by: rido-min <rido-min@users.noreply.github.com>
  • Loading branch information
ridomin and rido-min authored Oct 11, 2022
1 parent 6b33686 commit 0133048
Show file tree
Hide file tree
Showing 88 changed files with 817 additions and 552 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/push2nuget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v2
with:
dotnet-version: |
6.0.x
3.1.x
dotnet-version: 6.0.x

- name: Restore dependencies
run: dotnet restore
Expand Down
4 changes: 2 additions & 2 deletions samples/iothub-sample/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public Device(ILogger<Device> logger, IConfiguration configuration)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs"));
_logger.LogWarning($"Connecting to: {connectionSettings}");
_logger.LogWarning("Connecting to: {connectionSettings}", connectionSettings);

var client = new HubMqttClient(await HubDpsFactory.CreateFromConnectionSettingsAsync(connectionSettings, stoppingToken));

var v = await client.UpdateTwinAsync(new { started = DateTime.Now }, stoppingToken);
_logger.LogInformation($" Updated Twin to verison: {v} ");
_logger.LogInformation("Updated Twin to verison: {v}", v);
var twin = await client.GetTwinAsync(stoppingToken);
Console.WriteLine(twin);

Expand Down
10 changes: 4 additions & 6 deletions samples/memmon-protobuff/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ public class Device : BackgroundService
private double telemetryWorkingSet = 0;
private const bool default_enabled = true;
private const int default_interval = 45;

private string lastDiscconectReason = string.Empty;


private MemmonClient client;
private ConnectionSettings connectionSettings;

Expand All @@ -44,7 +42,7 @@ public Device(ILogger<Device> logger, IConfiguration configuration, TelemetryCli
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var cs = new ConnectionSettings(_configuration.GetConnectionString("cs")) { ModelId = MemmonClient.ModelId };
_logger.LogWarning($"Connecting to..{cs}");
_logger.LogWarning("Connecting to..{cs}", cs);
var mqtt = await BrokerClientFactory.CreateFromConnectionSettingsAsync(cs, true, stoppingToken);
connectionSettings = cs;
mqtt.DisconnectedAsync += Connection_DisconnectedAsync;
Expand All @@ -63,7 +61,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
client.Props.Interval = default_interval;


await client.AllProperties.SendMessageAsync(client.Props);
await client.AllProperties.SendMessageAsync(client.Props, stoppingToken);

RefreshScreen(this);

Expand All @@ -88,7 +86,7 @@ private async Task Connection_DisconnectedAsync(MQTTnet.Client.MqttClientDisconn
_telemetryClient.TrackException(arg.Exception);
}

lastDiscconectReason = arg.ReasonString;
//lastDiscconectReason = arg.ReasonString;
reconnectCounter++;
await Task.Yield();
}
Expand Down
2 changes: 1 addition & 1 deletion samples/memmon-protobuff/_protos/memmon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace _protos
internal class MemmonClient
{
internal const string ModelId = "rido.memmon";
public Properties Props = new Properties();
public Properties Props = new();
public IReadOnlyProperty<Properties> AllProperties { get; set; }
public IWritableProperty<Properties, ack> Property_interval { get; set; }
public IWritableProperty<Properties, ack> Property_enabled { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion samples/memmon-protobuff/memmon-protobuff.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.21.6" />
<PackageReference Include="Google.Protobuf" Version="3.21.7" />
<PackageReference Include="Grpc.Tools" Version="2.49.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
102 changes: 71 additions & 31 deletions samples/memmon/Device.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
using dtmi_rido_pnp_memmon;
using dtmi_rido_memmon;
using Humanizer;
using Microsoft.ApplicationInsights;
using MQTTnet.Extensions.MultiCloud;
using MQTTnet.Extensions.MultiCloud.AzureIoTClient;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.Connections;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Text;

namespace memmon;

public class Device : BackgroundService
{
private readonly ILogger<Device> _logger;
private readonly IConfiguration _configuration;
private readonly TelemetryClient _telemetryClient;

private readonly Stopwatch clock = Stopwatch.StartNew();
Expand All @@ -23,65 +20,60 @@ public class Device : BackgroundService
private int reconnectCounter = 0;

private double telemetryWorkingSet = 0;
private double managedMemory = 0;
private const bool default_enabled = true;
private const int default_interval = 45;
private const int default_interval = 500;

private string lastDiscconectReason = string.Empty;

private Imemmon client;
private ConnectionSettings connectionSettings;
private readonly MemMonFactory memmonFactory;

private string infoVersion = string.Empty;

public Device(ILogger<Device> logger, IConfiguration configuration, TelemetryClient tc)
public Device(TelemetryClient tc, MemMonFactory clientFactory)
{
_logger = logger;
_configuration = configuration;
_telemetryClient = tc;
memmonFactory = clientFactory;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var cs = new ConnectionSettings(_configuration.GetConnectionString("cs"));
_logger.LogWarning($"Connecting to..{cs}");
var memmonFactory = new MemMonFactory(_configuration);
client = await memmonFactory.CreateMemMonClientAsync(_configuration.GetConnectionString("cs"), stoppingToken);
client = await memmonFactory.CreateMemMonClientAsync("cs", stoppingToken);

client.Connection.DisconnectedAsync += Connection_DisconnectedAsync;

connectionSettings = MemMonFactory.connectionSettings;
_logger.LogWarning("Connected");

infoVersion = MemMonFactory.NuGetPackageVersion;

client.Property_enabled.OnMessage = Property_enabled_UpdateHandler;
client.Property_interval.OnMessage= Property_interval_UpdateHandler;
client.Command_getRuntimeStats.OnMessage= Command_getRuntimeStats_Handler;
client.Command_isPrime.OnMessage = Command_isPrime_Handler;
client.Command_malloc.OnMessage = Command_malloc_Hanlder;
client.Command_free.OnMessage = Command_free_Hanlder;

if (client is HubMqttClient)
{
await TwinInitializer.InitPropertyAsync(client.Connection, client.InitialState, client.Property_interval, "interval", default_interval);
await TwinInitializer.InitPropertyAsync(client.Connection, client.InitialState, client.Property_enabled, "enabled", default_enabled);
}
else
{
await PropertyInitializer.InitPropertyAsync(client.Property_interval, default_interval);
await PropertyInitializer.InitPropertyAsync(client.Property_enabled, default_enabled);
}
await client.Property_enabled.InitPropertyAsync(client.InitialState, default_enabled, stoppingToken);
await client.Property_interval.InitPropertyAsync(client.InitialState, default_interval, stoppingToken);


await client.Property_started.SendMessageAsync(DateTime.Now);
await client.Property_started.SendMessageAsync(DateTime.Now, stoppingToken);

RefreshScreen(this);

while (!stoppingToken.IsCancellationRequested)
{
if (client.Property_enabled.Value == true)
{
telemetryWorkingSet = Environment.WorkingSet;
telemetryWorkingSet = Environment.WorkingSet.Bytes().Megabytes;
managedMemory = GC.GetTotalMemory(true).Bytes().Megabytes;
await client.Telemetry_workingSet.SendMessageAsync(telemetryWorkingSet, stoppingToken);
await client.Telemetry_managedMemory.SendMessageAsync(managedMemory, stoppingToken);
telemetryCounter++;
_telemetryClient.TrackMetric("WorkingSet", telemetryWorkingSet);
_telemetryClient.TrackMetric("managedMemory", managedMemory);
}
await Task.Delay(client.Property_interval.Value * 1000, stoppingToken);
await Task.Delay(client.Property_interval.Value, stoppingToken);
}
}

Expand Down Expand Up @@ -150,6 +142,51 @@ private async Task<Ack<int>> Property_interval_UpdateHandler(int p)
return await Task.FromResult(ack);
}

private async Task<bool> Command_isPrime_Handler(int number)
{
commandCounter++;
IEnumerable<string> Multiples(int number)
{
return from n1 in Enumerable.Range(2, number / 2)
from n2 in Enumerable.Range(2, n1)
where n1 * n2 == number
select $"{n1} x {n2} => {number}";
}

bool result = Multiples(number).Any();
return await Task.FromResult(!result);

}

List<string> memory = new();
IntPtr memoryPtr = IntPtr.Zero;
private async Task<string> Command_malloc_Hanlder(int number)
{
commandCounter++;
for (int i = 0; i < number; i++)
{
memory.Add(i.ToOrdinalWords());
}

memoryPtr = Marshal.AllocHGlobal(number);
return await Task.FromResult(string.Empty);
}

private async Task<string> Command_free_Hanlder(string empty)
{
commandCounter++;
await _telemetryClient.FlushAsync(CancellationToken.None);
memory = new List<string>();
GC.Collect(2, GCCollectionMode.Forced, false);
if (memoryPtr != IntPtr.Zero)
{
Marshal.FreeHGlobal(memoryPtr);
memoryPtr = IntPtr.Zero;
}
return await Task.FromResult(string.Empty);
}


private async Task<Dictionary<string, string>> Command_getRuntimeStats_Handler(DiagnosticsMode req)
{
commandCounter++;
Expand Down Expand Up @@ -180,6 +217,8 @@ private async Task<Dictionary<string, string>> Command_getRuntimeStats_Handler(D
result.Add("telemetry: ", telemetryCounter.ToString());
result.Add("command: ", commandCounter.ToString());
result.Add("reconnects: ", reconnectCounter.ToString());
result.Add("workingSet", Environment.WorkingSet.Bytes().ToString());
result.Add("GC Memmory", GC.GetTotalAllocatedBytes().Bytes().ToString());
}
return await Task.FromResult(result);
}
Expand Down Expand Up @@ -212,7 +251,8 @@ string RenderData()
//AppendLineWithPadRight(sb, $"Twin send: {RidCounter.Current}");
AppendLineWithPadRight(sb, $"Command messages: {commandCounter}");
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"WorkingSet: {telemetryWorkingSet.Bytes()}");
AppendLineWithPadRight(sb, $"WorkingSet: {telemetryWorkingSet} MB");
AppendLineWithPadRight(sb, $"ManagedMemory: {managedMemory} MB");
AppendLineWithPadRight(sb, " ");
AppendLineWithPadRight(sb, $"Time Running: {TimeSpan.FromMilliseconds(clock.ElapsedMilliseconds).Humanize(3)}");
AppendLineWithPadRight(sb, $"ConnectionStatus: {client.Connection.IsConnected} [{lastDiscconectReason}]");
Expand Down
52 changes: 31 additions & 21 deletions samples/memmon/MemMonFactory.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
using dtmi_rido_pnp_memmon;
using dtmi_rido_memmon;
using MQTTnet.Extensions.MultiCloud.AwsIoTClient;
using MQTTnet.Extensions.MultiCloud.AzureIoTClient;
using MQTTnet.Extensions.MultiCloud.BrokerIoTClient;
using MQTTnet.Extensions.MultiCloud.Connections;

namespace memmon;

internal class MemMonFactory
public class MemMonFactory
{
static string nugetPackageVersion = string.Empty;
public static string NuGetPackageVersion => nugetPackageVersion;
internal static string ComputeDeviceKey(string masterKey, string deviceId) =>
Convert.ToBase64String(new System.Security.Cryptography.HMACSHA256(Convert.FromBase64String(masterKey)).ComputeHash(System.Text.Encoding.UTF8.GetBytes(deviceId)));

IConfiguration _configuration;
readonly IConfiguration _configuration;
readonly ILogger<MemMonFactory> _logger;

internal static ConnectionSettings connectionSettings;

public MemMonFactory(IConfiguration configuration)
public MemMonFactory(IConfiguration configuration, ILogger<MemMonFactory> logger)
{
this._configuration = configuration;
_configuration = configuration;
_logger = logger;
}

public async Task<Imemmon> CreateMemMonClientAsync(string connectionString, CancellationToken cancellationToken = default)
public async Task<Imemmon> CreateMemMonClientAsync(string connectinStringName, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(connectionString, nameof(connectionString));
connectionSettings = new ConnectionSettings(_configuration.GetConnectionString("cs"));
Imemmon client;
string connectionString = _configuration.GetConnectionString(connectinStringName);
connectionSettings = new ConnectionSettings(connectionString);
_logger.LogWarning("Connecting to..{cs}", connectionSettings);
if (connectionString.Contains("IdScope") || connectionString.Contains("SharedAccessKey"))
{
if (connectionSettings.IdScope != null && _configuration["masterKey"] != null)
Expand All @@ -34,52 +38,58 @@ public async Task<Imemmon> CreateMemMonClientAsync(string connectionString, Canc
var masterKey = _configuration.GetValue<string>("masterKey");
var deviceKey = ComputeDeviceKey(masterKey, deviceId);
var newCs = $"IdScope={connectionSettings.IdScope};DeviceId={deviceId};SharedAccessKey={deviceKey};SasMinutes={connectionSettings.SasMinutes}";
return await CreateHubClientAsync(newCs, cancellationToken);
client = await CreateHubClientAsync(newCs, cancellationToken);
}
else
{
return await CreateHubClientAsync(connectionString, cancellationToken);
client = await CreateHubClientAsync(connectionString, cancellationToken);
}
}
else if (connectionSettings.HostName.Contains("amazonaws.com"))
{
return await CreateAwsClientAsync(connectionString, cancellationToken);
client = await CreateAwsClientAsync(connectionString, cancellationToken);
}
else if (connectionSettings.HostName.Contains("azure-devices.net"))
{
return await CreateHubClientAsync(connectionString, cancellationToken);
client = await CreateHubClientAsync(connectionString, cancellationToken);
}
else
{
return await CreateBrokerClientAsync(connectionString, cancellationToken);
client = await CreateBrokerClientAsync(connectionString, cancellationToken);
}

_logger.LogWarning("Connected");
return client;
}

static async Task<dtmi_rido_pnp_memmon.mqtt.memmon> CreateBrokerClientAsync(string connectionString, CancellationToken cancellationToken = default)
static async Task<dtmi_rido_memmon.mqtt.memmon> CreateBrokerClientAsync(string connectionString, CancellationToken cancellationToken = default)
{
var cs = new ConnectionSettings(connectionString) { ModelId = Imemmon.ModelId };
var mqtt = await BrokerClientFactory.CreateFromConnectionSettingsAsync(cs, true, cancellationToken);
connectionSettings = BrokerClientFactory.ComputedSettings;
var client = new dtmi_rido_pnp_memmon.mqtt.memmon(mqtt);
var client = new dtmi_rido_memmon.mqtt.memmon(mqtt)
{
InitialState = String.Empty
};
nugetPackageVersion = BrokerClientFactory.NuGetPackageVersion;
return client;
}

static async Task<dtmi_rido_pnp_memmon.hub.memmon> CreateHubClientAsync(string connectionString, CancellationToken cancellationToken = default)
static async Task<dtmi_rido_memmon.hub.memmon> CreateHubClientAsync(string connectionString, CancellationToken cancellationToken = default)
{
var cs = new ConnectionSettings(connectionString) { ModelId = Imemmon.ModelId };
var hub = await HubDpsFactory.CreateFromConnectionSettingsAsync(cs);
var hub = await HubDpsFactory.CreateFromConnectionSettingsAsync(cs, cancellationToken);
connectionSettings = HubDpsFactory.ComputedSettings;
var client = new dtmi_rido_pnp_memmon.hub.memmon(hub);
var client = new dtmi_rido_memmon.hub.memmon(hub);
nugetPackageVersion = HubDpsFactory.NuGetPackageVersion;
client.InitialState = await client.GetTwinAsync();
client.InitialState = await client.GetTwinAsync(cancellationToken);
return client;
}

static async Task<dtmi_rido_pnp_memmon.aws.memmon> CreateAwsClientAsync(string connectionString, CancellationToken cancellationToken = default)
static async Task<dtmi_rido_memmon.aws.memmon> CreateAwsClientAsync(string connectionString, CancellationToken cancellationToken = default)
{
var mqtt = await AwsClientFactory.CreateFromConnectionSettingsAsync(connectionString, cancellationToken);
var client = new dtmi_rido_pnp_memmon.aws.memmon(mqtt);
var client = new dtmi_rido_memmon.aws.memmon(mqtt);
nugetPackageVersion = AwsClientFactory.NuGetPackageVersion;
return client;
}
Expand Down
1 change: 1 addition & 0 deletions samples/memmon/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
.ConfigureServices(services =>
{
services.AddApplicationInsightsTelemetryWorkerService();
services.AddSingleton<MemMonFactory>();
services.AddHostedService<Device>();
})
.Build();
Expand Down
2 changes: 1 addition & 1 deletion samples/memmon/Properties/launchSettings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"mosquitto_tls": {
"commandName": "Project",
"environmentVariables": {
"ConnectionStrings__cs": "HostName=test.mosquitto.org;ClientId=memmon01;CaFile=test.mosquitto.org.crt"
"ConnectionStrings__cs": "HostName=test.mosquitto.org;ClientId=memmon01;CaFile=test.mosquitto.org.pem"
}
}
}
Expand Down
File renamed without changes.
Loading

0 comments on commit 0133048

Please sign in to comment.