Skip to content

Commit

Permalink
#2075 Sync AWS IoT Edge Device (#2148)
Browse files Browse the repository at this point in the history
* #2075 Sync AWS IoT Edge Device

- Sync AWS IoT Edge Device
- TU sync IoT Edge Device
- GetByName async for DeviceModel and EdgeDeviceModel

* #2075 Add GetByNameAsync TU

* #2075 Add greengras devices job to quartz

* #2075 Fix GetByNameAsync

* #2075 Add EdgeDevice properties

- NbModules / NbDevices / ConnectionState
  • Loading branch information
delager authored and kbeaugrand committed Jun 23, 2023
1 parent 5c6f620 commit 0f3e901
Show file tree
Hide file tree
Showing 13 changed files with 881 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public AWSDeviceThingProfile()
Name = att.Key,
Value = att.Value
})));

_ = CreateMap<DescribeThingResponse, EdgeDevice>()
.ForMember(dest => dest.Id, opts => opts.MapFrom(src => src.ThingId))
.ForMember(dest => dest.Name, opts => opts.MapFrom(src => src.ThingName))
.ForMember(dest => dest.Version, opts => opts.MapFrom(src => src.Version))
.ForMember(dest => dest.Tags, opts => opts.MapFrom(src => src.Attributes.Select(att => new DeviceTagValue
{
Name = att.Key,
Value = att.Value
})));
}

private static MemoryStream EmptyPayload()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace AzureIoTHub.Portal.Domain.Repositories

public interface IDeviceModelRepository : IRepository<DeviceModel>
{
public DeviceModel? GetByName(string modelName);
Task<DeviceModel?> GetByNameAsync(string modelName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ namespace AzureIoTHub.Portal.Domain.Repositories

public interface IEdgeDeviceModelRepository : IRepository<EdgeDeviceModel>
{
Task<EdgeDeviceModel?> GetByNameAsync(string edgeModelDevice);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright (c) CGI France. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace AzureIoTHub.Portal.Infrastructure.Jobs.AWS
{
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Amazon.GreengrassV2;
using Amazon.GreengrassV2.Model;
using Amazon.IoT;
using Amazon.IoT.Model;
using Amazon.SecretsManager.Model;
using AutoMapper;
using AzureIoTHub.Portal.Application.Services;
using AzureIoTHub.Portal.Application.Services.AWS;
using AzureIoTHub.Portal.Domain;
using AzureIoTHub.Portal.Domain.Entities;
using AzureIoTHub.Portal.Domain.Repositories;
using AzureIoTHub.Portal.Models.v10;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Util;

[DisallowConcurrentExecution]
public class SyncGreenGrassDevicesJob : IJob
{

private readonly ILogger<SyncGreenGrassDevicesJob> logger;
private readonly IMapper mapper;
private readonly IUnitOfWork unitOfWork;
private readonly IEdgeDeviceRepository edgeDeviceRepository;
private readonly IEdgeDeviceModelRepository edgeDeviceModelRepository;
private readonly IDeviceTagValueRepository deviceTagValueRepository;
private readonly IAmazonIoT amazonIoTClient;
private readonly IAmazonGreengrassV2 amazonGreenGrass;
private readonly IConfigService configService;
private readonly IAWSExternalDeviceService awsExternalDevicesService;

public SyncGreenGrassDevicesJob(
ILogger<SyncGreenGrassDevicesJob> logger,
IMapper mapper,
IUnitOfWork unitOfWork,
IEdgeDeviceRepository edgeDeviceRepository,
IEdgeDeviceModelRepository edgeDeviceModelRepository,
IDeviceTagValueRepository deviceTagValueRepository,
IAmazonIoT amazonIoTClient,
IAmazonGreengrassV2 amazonGreenGrass,
IConfigService configService,
IAWSExternalDeviceService awsExternalDevicesService)
{
this.mapper = mapper;
this.unitOfWork = unitOfWork;
this.edgeDeviceRepository = edgeDeviceRepository;
this.edgeDeviceModelRepository = edgeDeviceModelRepository;
this.deviceTagValueRepository = deviceTagValueRepository;
this.amazonIoTClient = amazonIoTClient;
this.amazonGreenGrass = amazonGreenGrass;
this.configService = configService;
this.awsExternalDevicesService = awsExternalDevicesService;
this.logger = logger;
}


public async Task Execute(IJobExecutionContext context)
{
try
{
this.logger.LogInformation("Start of sync GreenGrass Devices job");

await SyncGreenGrassDevicesAsEdgeDevices();

this.logger.LogInformation("End of sync GreenGrass Devices job");
}
catch (Exception e)
{
this.logger.LogError(e, "Sync GreenGrass Devices job has failed");
}
}

private async Task SyncGreenGrassDevicesAsEdgeDevices()
{
var things = await GetAllGreenGrassDevices();

foreach (var thing in things)
{
//Thing error
if (thing.HttpStatusCode != HttpStatusCode.OK)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error in the Amazon IoT API : {thing.HttpStatusCode}");
continue;
}

//ThingType not specified
if (thing.ThingTypeName.IsNullOrWhiteSpace())
{
this.logger.LogInformation($"Cannot import Greengrass device '{thing.ThingName}' since it doesn't have related thing type.");
continue;
}

//EdgeDeviceModel not find in DB
var edgeDeviceModel = await this.edgeDeviceModelRepository.GetByNameAsync(thing.ThingTypeName);
if (edgeDeviceModel == null)
{
this.logger.LogWarning($"Cannot import Greengrass device '{thing.ThingName}'. The EdgeDeviceModel '{thing.ThingTypeName}' doesn't exist");
continue;
}

//Map with EdgeDevice
var edgeDevice = this.mapper.Map<EdgeDevice>(thing);
edgeDevice.DeviceModelId = edgeDeviceModel.Id;
//EdgeDevices properties that are not present in the thing
try
{
var modules = await this.configService.GetConfigModuleList(edgeDevice.DeviceModelId);
edgeDevice.NbDevices = await this.awsExternalDevicesService.GetEdgeDeviceNbDevices(this.mapper.Map<IoTEdgeDevice>(edgeDevice));
edgeDevice.NbModules = modules.Count;
var coreDevice = await amazonGreenGrass.GetCoreDeviceAsync(new GetCoreDeviceRequest() { CoreDeviceThingName = thing.ThingName });
if (coreDevice.HttpStatusCode != HttpStatusCode.OK)
{
this.logger.LogWarning($"Cannot import Greengrass device '{thing.ThingName}' due to an error retrieving core device in the Amazon IoT Data API : {coreDevice.HttpStatusCode}");
continue;
}
edgeDevice.ConnectionState = coreDevice.Status == CoreDeviceStatus.HEALTHY ? "Connected" : "Disconnected";
}
catch (Exception e)
{
this.logger.LogWarning($"Cannot import Greengrass device '{thing.ThingName}' due to an error retrieving Greengrass device properties in the Amazon IoT Data API.", e);
continue;
}

//Create or update the Edge Device
await CreateOrUpdateGreenGrassDevice(edgeDevice);
}

foreach (var item in (await this.edgeDeviceRepository.GetAllAsync(
edgeDevice => !things.Select(x => x.ThingId).Contains(edgeDevice.Id),
default,
d => d.Tags,
d => d.Labels
)))
{
this.edgeDeviceRepository.Delete(item.Id);
}

await this.unitOfWork.SaveAsync();
}

private async Task<List<DescribeThingResponse>> GetAllGreenGrassDevices()
{
var devices = new List<DescribeThingResponse>();

var nextToken = string.Empty;

var response = await amazonGreenGrass.ListCoreDevicesAsync(
new ListCoreDevicesRequest
{
NextToken = nextToken
});

foreach (var requestDescribeThing in response.CoreDevices.Select(device => new DescribeThingRequest { ThingName = device.CoreDeviceThingName }))
{
try
{
devices.Add(await this.amazonIoTClient.DescribeThingAsync(requestDescribeThing));
}
catch (AmazonIoTException e)
{
this.logger.LogWarning($"Cannot import Greengrass device '{requestDescribeThing.ThingName}' due to an error in the Amazon IoT API.", e);
continue;
}
}

return devices;
}

private async Task CreateOrUpdateGreenGrassDevice(EdgeDevice edgeDevice)
{
var edgeDeviceEntity = await this.edgeDeviceRepository.GetByIdAsync(edgeDevice.Id, d => d.Tags);

if (edgeDeviceEntity == null)
{
await this.edgeDeviceRepository.InsertAsync(edgeDevice);
}
else
{
if (edgeDeviceEntity.Version >= edgeDevice.Version) return;

foreach (var deviceTagEntity in edgeDeviceEntity.Tags)
{
this.deviceTagValueRepository.Delete(deviceTagEntity.Id);
}

_ = this.mapper.Map(edgeDevice, edgeDeviceEntity);
this.edgeDeviceRepository.Update(edgeDeviceEntity);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private async Task SyncThingsAsDevices()
continue;
}

//ThingType not find in DB
var deviceModel = this.deviceModelRepository.GetByName(thing.ThingTypeName);
//DeviceModel not find in DB
var deviceModel = await this.deviceModelRepository.GetByNameAsync(thing.ThingTypeName);
if (deviceModel == null)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}'. The ThingType '{thing.ThingTypeName}' doesn't exist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ namespace AzureIoTHub.Portal.Infrastructure.Repositories
{
using AzureIoTHub.Portal.Domain.Repositories;
using Domain.Entities;
using Microsoft.EntityFrameworkCore;

public class DeviceModelRepository : GenericRepository<DeviceModel>, IDeviceModelRepository
{
public DeviceModelRepository(PortalDbContext context) : base(context)
{
}

public DeviceModel? GetByName(string modelName)
public async Task<DeviceModel?> GetByNameAsync(string modelName)
{
return GetAll().FirstOrDefault(model => model.Name.Equals(modelName, StringComparison.Ordinal));
return await this.context.Set<DeviceModel>()
.FirstOrDefaultAsync(deviceModel => deviceModel.Name == modelName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ namespace AzureIoTHub.Portal.Infrastructure.Repositories
{
using Domain.Entities;
using Domain.Repositories;
using Microsoft.EntityFrameworkCore;

public class EdgeDeviceModelRepository : GenericRepository<EdgeDeviceModel>, IEdgeDeviceModelRepository
{
public EdgeDeviceModelRepository(PortalDbContext context) : base(context)
{
}

public async Task<EdgeDeviceModel?> GetByNameAsync(string edgeModelDevice)
{
return await this.context.Set<EdgeDeviceModel>()
.FirstOrDefaultAsync(edgeDeviceModel => edgeDeviceModel.Name == edgeModelDevice);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace AzureIoTHub.Portal.Infrastructure.Services
{
using System;
using System.Threading.Tasks;
using Amazon.GreengrassV2;
using Amazon.IoT.Model;
using AutoMapper;
using AzureIoTHub.Portal.Application.Managers;
Expand Down Expand Up @@ -137,6 +138,7 @@ public async Task<IoTEdgeDevice> GetEdgeDevice(string edgeDeviceId)
deviceDto.Modules = await this.configService.GetConfigModuleList(model.ExternalIdentifier!);
deviceDto.NbDevices = await this.awsExternalDevicesService.GetEdgeDeviceNbDevices(deviceDto);
deviceDto.NbModules = deviceDto.Modules.Count;
deviceDto.ConnectionState = deviceDto.RuntimeResponse == CoreDeviceStatus.HEALTHY ? "Connected" : "Disconnected";

return deviceDto;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ private static IServiceCollection ConfigureAWSSyncJobs(this IServiceCollection s
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));

_ = q.AddJob<SyncGreenGrassDevicesJob>(j => j.WithIdentity(nameof(SyncGreenGrassDevicesJob)))
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncGreenGrassDevicesJob)}")
.ForJob(nameof(SyncGreenGrassDevicesJob))
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));
});
}

Expand Down
Loading

0 comments on commit 0f3e901

Please sign in to comment.