Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process LoRa Message only if comming from a device and can be deserialized #1891

Merged
merged 1 commit into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/AzureIoTHub.Portal.Infrastructure/ConnectionAuthMethod.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) CGI France. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace AzureIoTHub.Portal.Infrastructure
{
internal class ConnectionAuthMethod
{
internal enum ConnectionAuthScope
{
Hub,
Device,
Module
}

internal enum ConnectionAuthType
{
Symkey,
Sas,
X509
}

public ConnectionAuthScope Scope { get; set; }

public ConnectionAuthType Type { get; set; }

public string Issuer { get; set; }
}
}
54 changes: 37 additions & 17 deletions src/AzureIoTHub.Portal.Server/Services/LoRaWanDeviceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace AzureIoTHub.Portal.Server.Services
using System.Globalization;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using AutoMapper;
using Azure.Messaging.EventHubs;
Expand All @@ -24,6 +25,7 @@ namespace AzureIoTHub.Portal.Server.Services
using Microsoft.Extensions.Logging;
using Models.v10;
using Models.v10.LoRaWAN;
using static AzureIoTHub.Portal.Infrastructure.ConnectionAuthMethod;

public class LoRaWanDeviceService : DeviceServiceBase<LoRaDeviceDetails>
{
Expand Down Expand Up @@ -161,38 +163,57 @@ public override async Task ProcessTelemetryEvent(EventData eventMessage)

LoRaDeviceTelemetry deviceTelemetry;

try
if (!eventMessage.SystemProperties.TryGetValue("iothub-connection-auth-method", out var authMethod))
{
deviceTelemetry = new LoRaDeviceTelemetry
{
Id = eventMessage.SequenceNumber.ToString(CultureInfo.InvariantCulture),
EnqueuedTime = eventMessage.EnqueuedTime.UtcDateTime,
Telemetry = eventMessage.EventBody.ToObjectFromJson<LoRaTelemetry>()
};
this.logger.LogWarning($"Unable read 'iothub-connection-auth-method' property of the message. Please verify that the event is comming from an IoT Device.");
return;
}
catch (JsonException)

var eventAuthMethod = JsonSerializer.Deserialize<ConnectionAuthMethod>(authMethod.ToString(), new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
Converters =
{
new JsonStringEnumConverter(JsonNamingPolicy.CamelCase)
}
});

if (eventAuthMethod.Scope != ConnectionAuthScope.Device)
{
this.logger.LogWarning($"Unable to deserialize the event message with id {eventMessage.SequenceNumber} as device telemetry");
this.logger.LogTrace($"Event wasn't issued by a device. Skipping this event.");
return;
}

var loRaWanDevice = await this.lorawanDeviceRepository.GetByIdAsync(deviceTelemetry.Telemetry.DeviceEUI, device => device.Telemetry);
if (!eventMessage.SystemProperties.TryGetValue("iothub-connection-device-id", out var deviceId))
{
this.logger.LogWarning($"Unable read 'iothub-connection-device-id' property of the message. Please verify that the event is comming from an IoT Device.");
return;
}

var loRaWanDevice = await this.lorawanDeviceRepository.GetByIdAsync(deviceId, device => device.Telemetry);

if (loRaWanDevice == null)
{
return;
}

deviceTelemetry = new LoRaDeviceTelemetry
{
Id = eventMessage.SequenceNumber.ToString(CultureInfo.InvariantCulture),
EnqueuedTime = eventMessage.EnqueuedTime.UtcDateTime,
Telemetry = eventMessage.EventBody.ToObjectFromJson<LoRaTelemetry>()
};

if (loRaWanDevice.Telemetry.Any(telemetry => telemetry.Id.Equals(deviceTelemetry.Id, StringComparison.Ordinal)))
{
return;
}

loRaWanDevice.Telemetry.Add(deviceTelemetry);

await this.unitOfWork.SaveAsync();
KeepOnlyLatestTelemetry(loRaWanDevice);

await KeepOnlyLatestHundredTelemetry(loRaWanDevice);
await this.unitOfWork.SaveAsync();
}
catch (DbUpdateException e)
{
Expand All @@ -202,20 +223,19 @@ public override async Task ProcessTelemetryEvent(EventData eventMessage)
return;
}

private async Task KeepOnlyLatestHundredTelemetry(LorawanDevice loRaWanDevice)
private void KeepOnlyLatestTelemetry(LorawanDevice loRaWanDevice, int numberOfMessages = 100)
{
if (loRaWanDevice.Telemetry.Count <= 100) return;
if (loRaWanDevice.Telemetry.Count <= numberOfMessages) return;

loRaWanDevice.Telemetry
.OrderByDescending(telemetry => telemetry.EnqueuedTime)
.Skip(100)
.Skip(numberOfMessages)
.ToList()
.ForEach(telemetry =>
{
this.deviceTelemetryRepository.Delete(telemetry.Id);
_ = loRaWanDevice.Telemetry.Remove(telemetry);
kbeaugrand marked this conversation as resolved.
Show resolved Hide resolved
});

await this.unitOfWork.SaveAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace AzureIoTHub.Portal.Tests.Unit.Server.Services
using System.Globalization;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using AutoFixture;
using AutoMapper;
Expand All @@ -18,6 +19,7 @@ namespace AzureIoTHub.Portal.Tests.Unit.Server.Services
using AzureIoTHub.Portal.Domain;
using AzureIoTHub.Portal.Domain.Exceptions;
using AzureIoTHub.Portal.Domain.Repositories;
using AzureIoTHub.Portal.Infrastructure;
using AzureIoTHub.Portal.Server.Services;
using AzureIoTHub.Portal.Shared.Models.v10;
using EntityFramework.Exceptions.Common;
Expand Down Expand Up @@ -80,6 +82,25 @@ public override void Setup()
Mapper = Services.GetRequiredService<IMapper>();
}

[Test]
public void ServiceCanDeserializeEventAuthMethod()
{
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
Converters =
{
new JsonStringEnumConverter(JsonNamingPolicy.CamelCase)
}
};

var authMethodJson = /*lang=json,strict*/ "{\"scope\":\"module\",\"type\":\"sas\",\"issuer\":\"iothub\"}";

var eventAuthMethod = JsonSerializer.Deserialize<ConnectionAuthMethod>(authMethodJson.ToString(), options);

Assert.AreEqual(authMethodJson, JsonSerializer.Serialize(eventAuthMethod, options));
}

[Test]
public async Task GetDevice_DeviceExist_ReturnsExpectedDevice()
{
Expand Down Expand Up @@ -475,6 +496,81 @@ public async Task GetDeviceTelemetry_NonExistingDevice_ReturnsEmptyArray()
MockRepository.VerifyAll();
}

[Test]
public async Task WhenEventDataIsNull_ShouldNotProcess()
{
// Arrange

// Act
await this.lorawanDeviceService.ProcessTelemetryEvent(null);

// Assert
MockRepository.VerifyAll();
}

[Test]
public async Task WhenEventDataDoesntHaveSystemProperties_ShouldNotProcess()
{
// Arrange
var telemeryMessage = Fixture.Create<LoRaTelemetry>();
var enqueuedAt = Fixture.Create<DateTimeOffset>();
var sequenceNumber = Fixture.Create<long>();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber);

// Act
await this.lorawanDeviceService.ProcessTelemetryEvent(eventMessage);

// Assert
MockRepository.VerifyAll();
}


[Test]
public async Task WhenEventDataIsNotFromDevice_ShouldNotProcess()
{
// Arrange
var telemeryMessage = Fixture.Create<LoRaTelemetry>();
var enqueuedAt = Fixture.Create<DateTimeOffset>();
var sequenceNumber = Fixture.Create<long>();

var systemProperties = new Dictionary<string, object>
{
{ "iothub-connection-auth-method" , /*lang=json,strict*/ "{\"scope\":\"module\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}" }
}.AsReadOnly();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber, systemProperties: systemProperties);

// Act
await this.lorawanDeviceService.ProcessTelemetryEvent(eventMessage);

// Assert
MockRepository.VerifyAll();
}

[Test]
public async Task WhenEventDataDoesntHaveDeviceIdInSystemProperties_ShouldNotProcess()
{
// Arrange
var telemeryMessage = Fixture.Create<LoRaTelemetry>();
var enqueuedAt = Fixture.Create<DateTimeOffset>();
var sequenceNumber = Fixture.Create<long>();

var systemProperties = new Dictionary<string, object>
{
{ "iothub-connection-auth-method" , /*lang=json,strict*/ "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}" }
}.AsReadOnly();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber, systemProperties: systemProperties);

// Act
await this.lorawanDeviceService.ProcessTelemetryEvent(eventMessage);

// Assert
MockRepository.VerifyAll();
}


[Test]
public async Task ProcessTelemetryEvent_EventDataForExistingDevice_TelemetryIsAddedToDevice()
{
Expand All @@ -488,7 +584,13 @@ public async Task ProcessTelemetryEvent_EventDataForExistingDevice_TelemetryIsAd
Id = telemeryMessage.DeviceEUI
};

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber);
var systemProperties = new Dictionary<string, object>
{
{ "iothub-connection-device-id", telemeryMessage.DeviceEUI },
{ "iothub-connection-auth-method" , /*lang=json,strict*/ "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}" }
}.AsReadOnly();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber, systemProperties: systemProperties);

_ = this.mockLorawanDeviceRepository.Setup(repository => repository.GetByIdAsync(telemeryMessage.DeviceEUI, d => d.Telemetry))
.ReturnsAsync(lorawanDevice);
Expand Down Expand Up @@ -516,7 +618,13 @@ public async Task ProcessTelemetryEvent_DbUpdateExceptionIsThrown_NothingIsDone(
Id = telemeryMessage.DeviceEUI
};

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber);
var systemProperties = new Dictionary<string, object>
{
{ "iothub-connection-device-id", telemeryMessage.DeviceEUI },
{ "iothub-connection-auth-method" , /*lang=json,strict*/ "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}" }
}.AsReadOnly();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber, systemProperties: systemProperties);

_ = this.mockLorawanDeviceRepository.Setup(repository => repository.GetByIdAsync(telemeryMessage.DeviceEUI, d => d.Telemetry))
.ReturnsAsync(lorawanDevice);
Expand Down Expand Up @@ -569,7 +677,13 @@ public async Task ProcessTelemetryEvent_EventDataForExistingDeviceWithSameTeleme
}
};

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber);
var systemProperties = new Dictionary<string, object>
{
{ "iothub-connection-device-id", telemeryMessage.DeviceEUI },
{ "iothub-connection-auth-method" , /*lang=json,strict*/ "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}" }
}.AsReadOnly();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber, systemProperties: systemProperties);

_ = this.mockLorawanDeviceRepository.Setup(repository => repository.GetByIdAsync(telemeryMessage.DeviceEUI, d => d.Telemetry))
.ReturnsAsync(lorawanDevice);
Expand All @@ -589,7 +703,13 @@ public async Task ProcessTelemetryEvent_EventDataForNonExistingDevice_TelemetryI
var enqueuedAt = Fixture.Create<DateTimeOffset>();
var sequenceNumber = Fixture.Create<long>();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber);
var systemProperties = new Dictionary<string, object>
{
{ "iothub-connection-device-id", telemeryMessage.DeviceEUI },
{ "iothub-connection-auth-method" , /*lang=json,strict*/ "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}" }
}.AsReadOnly();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber, systemProperties: systemProperties);

_ = this.mockLorawanDeviceRepository.Setup(repository => repository.GetByIdAsync(telemeryMessage.DeviceEUI, d => d.Telemetry))
.ReturnsAsync((LorawanDevice)null);
Expand All @@ -615,7 +735,13 @@ public async Task ProcessTelemetryEvent_NewEventDataForExistingDeviceWith100Tele
Telemetry = Fixture.CreateMany<LoRaDeviceTelemetry>(100).ToList()
};

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber);
var systemProperties = new Dictionary<string, object>
{
{ "iothub-connection-device-id", telemeryMessage.DeviceEUI },
{ "iothub-connection-auth-method" , /*lang=json,strict*/ "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\",\"acceptingIpFilterRule\":null}" }
}.AsReadOnly();

var eventMessage = EventHubsModelFactory.EventData(new BinaryData(JsonSerializer.Serialize(telemeryMessage)), enqueuedTime: enqueuedAt, sequenceNumber: sequenceNumber, systemProperties: systemProperties);

_ = this.mockLorawanDeviceRepository.Setup(repository => repository.GetByIdAsync(telemeryMessage.DeviceEUI, d => d.Telemetry))
.ReturnsAsync(lorawanDevice);
Expand All @@ -631,8 +757,8 @@ public async Task ProcessTelemetryEvent_NewEventDataForExistingDeviceWith100Tele

// Assert
MockRepository.VerifyAll();
this.mockLoRaDeviceTelemetryRepository.Verify(repository => repository.Delete(It.IsAny<string>()), Times.Once);
this.mockUnitOfWork.Verify(work => work.SaveAsync(), Times.Exactly(2));
this.mockUnitOfWork.Verify(work => work.SaveAsync(), Times.Exactly(1));
_ = lorawanDevice.Telemetry.Should().HaveCount(100);
}

[Test]
Expand Down