Skip to content

Commit

Permalink
#2075 Sync AWS IoT Edge Device
Browse files Browse the repository at this point in the history
- Sync AWS IoT Edge Device
- TU sync IoT Edge Device
- GetByName async for DeviceModel and EdgeDeviceModel
  • Loading branch information
delager authored and kbeaugrand committed Jun 5, 2023
1 parent 232e9b5 commit 8df7580
Show file tree
Hide file tree
Showing 9 changed files with 700 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public AWSDeviceThingProfile()
Name = att.Key,
Value = att.Value
})));

_ = CreateMap<DescribeThingResponse, EdgeDevice>()
.ForMember(dest => dest.Id, opts => opts.MapFrom(src => src.ThingId))
.ForMember(dest => dest.Name, opts => opts.MapFrom(src => src.ThingName))
.ForMember(dest => dest.Version, opts => opts.MapFrom(src => src.Version))
.ForMember(dest => dest.Tags, opts => opts.MapFrom(src => src.Attributes.Select(att => new DeviceTagValue
{
Name = att.Key,
Value = att.Value
})));
}

private static MemoryStream EmptyPayload()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace AzureIoTHub.Portal.Domain.Repositories

public interface IDeviceModelRepository : IRepository<DeviceModel>
{
public DeviceModel? GetByName(string modelName);
Task<DeviceModel?> GetByNameAsync(string modelName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ namespace AzureIoTHub.Portal.Domain.Repositories

public interface IEdgeDeviceModelRepository : IRepository<EdgeDeviceModel>
{
Task<EdgeDeviceModel?> GetByNameAsync(string edgeModelDevice);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright (c) CGI France. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace AzureIoTHub.Portal.Infrastructure.Jobs.AWS
{
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Amazon.GreengrassV2;
using Amazon.GreengrassV2.Model;
using Amazon.IoT;
using Amazon.IoT.Model;
using AutoMapper;
using AzureIoTHub.Portal.Domain;
using AzureIoTHub.Portal.Domain.Entities;
using AzureIoTHub.Portal.Domain.Repositories;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Util;

[DisallowConcurrentExecution]
public class SyncGreenGrassDevicesJob : IJob
{

private readonly ILogger<SyncGreenGrassDevicesJob> logger;
private readonly IMapper mapper;
private readonly IUnitOfWork unitOfWork;
private readonly IEdgeDeviceRepository edgeDeviceRepository;
private readonly IEdgeDeviceModelRepository edgeDeviceModelRepository;
private readonly IDeviceTagValueRepository deviceTagValueRepository;
private readonly IAmazonIoT amazonIoTClient;
private readonly IAmazonGreengrassV2 amazonGreenGrass;

public SyncGreenGrassDevicesJob(
ILogger<SyncGreenGrassDevicesJob> logger,
IMapper mapper,
IUnitOfWork unitOfWork,
IEdgeDeviceRepository edgeDeviceRepository,
IEdgeDeviceModelRepository edgeDeviceModelRepository,
IDeviceTagValueRepository deviceTagValueRepository,
IAmazonIoT amazonIoTClient,
IAmazonGreengrassV2 amazonGreenGrass)
{
this.mapper = mapper;
this.unitOfWork = unitOfWork;
this.edgeDeviceRepository = edgeDeviceRepository;
this.edgeDeviceModelRepository = edgeDeviceModelRepository;
this.deviceTagValueRepository = deviceTagValueRepository;
this.amazonIoTClient = amazonIoTClient;
this.amazonGreenGrass = amazonGreenGrass;
this.logger = logger;
}


public async Task Execute(IJobExecutionContext context)
{
try
{
this.logger.LogInformation("Start of sync GreenGrass Devices job");

await SyncGreenGrassDevicesAsEdgeDevices();

this.logger.LogInformation("End of sync GreenGrass Devices job");
}
catch (Exception e)
{
this.logger.LogError(e, "Sync GreenGrass Devices job has failed");
}
}

private async Task SyncGreenGrassDevicesAsEdgeDevices()
{
var things = await GetAllGreenGrassDevices();

foreach (var thing in things)
{
//Thing error
if (thing.HttpStatusCode != HttpStatusCode.OK)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error in the Amazon IoT API : {thing.HttpStatusCode}");
continue;
}

//ThingType not specified
if (thing.ThingTypeName.IsNullOrWhiteSpace())
{
this.logger.LogInformation($"Cannot import Greengrass device '{thing.ThingName}' since it doesn't have related thing type.");
continue;
}

//EdgeDeviceModel not find in DB
var edgeDeviceModel = await this.edgeDeviceModelRepository.GetByNameAsync(thing.ThingTypeName);
if (edgeDeviceModel == null)
{
this.logger.LogWarning($"Cannot import Greengrass device '{thing.ThingName}'. The EdgeDeviceModel '{thing.ThingTypeName}' doesn't exist");
continue;
}

//Create or update the Edge Device
await CreateOrUpdateGreenGrassDevice(thing, edgeDeviceModel);
}

foreach (var item in (await this.edgeDeviceRepository.GetAllAsync(
edgeDevice => !things.Select(x => x.ThingId).Contains(edgeDevice.Id),
default,
d => d.Tags,
d => d.Labels
)))
{
this.edgeDeviceRepository.Delete(item.Id);
}

await this.unitOfWork.SaveAsync();
}

private async Task<List<DescribeThingResponse>> GetAllGreenGrassDevices()
{
var devices = new List<DescribeThingResponse>();

var nextToken = string.Empty;

var response = await amazonGreenGrass.ListCoreDevicesAsync(
new ListCoreDevicesRequest
{
NextToken = nextToken
});

foreach (var requestDescribeThing in response.CoreDevices.Select(device => new DescribeThingRequest { ThingName = device.CoreDeviceThingName }))
{
try
{
devices.Add(await this.amazonIoTClient.DescribeThingAsync(requestDescribeThing));
}
catch (AmazonIoTException e)
{
this.logger.LogWarning($"Cannot import Greengrass device '{requestDescribeThing.ThingName}' due to an error in the Amazon IoT API.", e);
continue;
}
}

return devices;
}

private async Task CreateOrUpdateGreenGrassDevice(DescribeThingResponse greengrassDevice, EdgeDeviceModel edgeModelDevice)
{
var edgeDevice = this.mapper.Map<EdgeDevice>(greengrassDevice);
var edgeDeviceEntity = await this.edgeDeviceRepository.GetByIdAsync(edgeDevice.Id, d => d.Tags);
edgeDevice.DeviceModelId = edgeModelDevice.Id;

if (edgeDeviceEntity == null)
{
await this.edgeDeviceRepository.InsertAsync(edgeDevice);
}
else
{
if (edgeDeviceEntity.Version >= edgeDevice.Version) return;

foreach (var deviceTagEntity in edgeDeviceEntity.Tags)
{
this.deviceTagValueRepository.Delete(deviceTagEntity.Id);
}

_ = this.mapper.Map(edgeDevice, edgeDeviceEntity);
this.edgeDeviceRepository.Update(edgeDeviceEntity);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private async Task SyncThingsAsDevices()
continue;
}

//ThingType not find in DB
var deviceModel = this.deviceModelRepository.GetByName(thing.ThingTypeName);
//DeviceModel not find in DB
var deviceModel = await this.deviceModelRepository.GetByNameAsync(thing.ThingTypeName);
if (deviceModel == null)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}'. The ThingType '{thing.ThingTypeName}' doesn't exist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ namespace AzureIoTHub.Portal.Infrastructure.Repositories
{
using AzureIoTHub.Portal.Domain.Repositories;
using Domain.Entities;
using Microsoft.EntityFrameworkCore;

public class DeviceModelRepository : GenericRepository<DeviceModel>, IDeviceModelRepository
{
public DeviceModelRepository(PortalDbContext context) : base(context)
{
}

public DeviceModel? GetByName(string modelName)
public async Task<DeviceModel?> GetByNameAsync(string modelName)
{
return GetAll().FirstOrDefault(model => model.Name.Equals(modelName, StringComparison.Ordinal));
return await this.context.Set<DeviceModel>()
.FirstOrDefaultAsync(deviceModel => deviceModel.Name.Equals(modelName, StringComparison.Ordinal));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ namespace AzureIoTHub.Portal.Infrastructure.Repositories
{
using Domain.Entities;
using Domain.Repositories;
using Microsoft.EntityFrameworkCore;

public class EdgeDeviceModelRepository : GenericRepository<EdgeDeviceModel>, IEdgeDeviceModelRepository
{
public EdgeDeviceModelRepository(PortalDbContext context) : base(context)
{
}

public async Task<EdgeDeviceModel?> GetByNameAsync(string edgeModelDevice)
{
return await this.context.Set<EdgeDeviceModel>()
.FirstOrDefaultAsync(edgeDeviceModel => edgeDeviceModel.Name.Equals(edgeModelDevice, StringComparison.Ordinal));
}
}
}
Loading

0 comments on commit 8df7580

Please sign in to comment.