Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2075 Sync AWS IoT Edge Device #2148

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public AWSDeviceThingProfile()
Name = att.Key,
Value = att.Value
})));

_ = CreateMap<DescribeThingResponse, EdgeDevice>()
.ForMember(dest => dest.Id, opts => opts.MapFrom(src => src.ThingId))
.ForMember(dest => dest.Name, opts => opts.MapFrom(src => src.ThingName))
.ForMember(dest => dest.Version, opts => opts.MapFrom(src => src.Version))
.ForMember(dest => dest.Tags, opts => opts.MapFrom(src => src.Attributes.Select(att => new DeviceTagValue
{
Name = att.Key,
Value = att.Value
})));
}

private static MemoryStream EmptyPayload()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace AzureIoTHub.Portal.Domain.Repositories

public interface IDeviceModelRepository : IRepository<DeviceModel>
{
public DeviceModel? GetByName(string modelName);
Task<DeviceModel?> GetByNameAsync(string modelName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ namespace AzureIoTHub.Portal.Domain.Repositories

public interface IEdgeDeviceModelRepository : IRepository<EdgeDeviceModel>
{
Task<EdgeDeviceModel?> GetByNameAsync(string edgeModelDevice);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright (c) CGI France. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace AzureIoTHub.Portal.Infrastructure.Jobs.AWS
{
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Amazon.GreengrassV2;
using Amazon.GreengrassV2.Model;
using Amazon.IoT;
using Amazon.IoT.Model;
using Amazon.SecretsManager.Model;
using AutoMapper;
using AzureIoTHub.Portal.Application.Services;
using AzureIoTHub.Portal.Application.Services.AWS;
using AzureIoTHub.Portal.Domain;
using AzureIoTHub.Portal.Domain.Entities;
using AzureIoTHub.Portal.Domain.Repositories;
using AzureIoTHub.Portal.Models.v10;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Util;

[DisallowConcurrentExecution]
public class SyncGreenGrassDevicesJob : IJob
{

private readonly ILogger<SyncGreenGrassDevicesJob> logger;
private readonly IMapper mapper;
private readonly IUnitOfWork unitOfWork;
private readonly IEdgeDeviceRepository edgeDeviceRepository;
private readonly IEdgeDeviceModelRepository edgeDeviceModelRepository;
private readonly IDeviceTagValueRepository deviceTagValueRepository;
private readonly IAmazonIoT amazonIoTClient;
private readonly IAmazonGreengrassV2 amazonGreenGrass;
private readonly IConfigService configService;
private readonly IAWSExternalDeviceService awsExternalDevicesService;

public SyncGreenGrassDevicesJob(
ILogger<SyncGreenGrassDevicesJob> logger,
IMapper mapper,
IUnitOfWork unitOfWork,
IEdgeDeviceRepository edgeDeviceRepository,
IEdgeDeviceModelRepository edgeDeviceModelRepository,
IDeviceTagValueRepository deviceTagValueRepository,
IAmazonIoT amazonIoTClient,
IAmazonGreengrassV2 amazonGreenGrass,
IConfigService configService,
IAWSExternalDeviceService awsExternalDevicesService)
{
this.mapper = mapper;
this.unitOfWork = unitOfWork;
this.edgeDeviceRepository = edgeDeviceRepository;
this.edgeDeviceModelRepository = edgeDeviceModelRepository;
this.deviceTagValueRepository = deviceTagValueRepository;
this.amazonIoTClient = amazonIoTClient;
this.amazonGreenGrass = amazonGreenGrass;
this.configService = configService;
this.awsExternalDevicesService = awsExternalDevicesService;
this.logger = logger;
}


public async Task Execute(IJobExecutionContext context)
{
try
{
this.logger.LogInformation("Start of sync GreenGrass Devices job");

await SyncGreenGrassDevicesAsEdgeDevices();

this.logger.LogInformation("End of sync GreenGrass Devices job");
}
catch (Exception e)
{
this.logger.LogError(e, "Sync GreenGrass Devices job has failed");
}
Comment on lines +75 to +78

Check notice

Code scanning / CodeQL

Generic catch clause

Generic catch clause.
}

private async Task SyncGreenGrassDevicesAsEdgeDevices()
{
var things = await GetAllGreenGrassDevices();

foreach (var thing in things)
{
//Thing error
if (thing.HttpStatusCode != HttpStatusCode.OK)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error in the Amazon IoT API : {thing.HttpStatusCode}");
continue;
}

//ThingType not specified
if (thing.ThingTypeName.IsNullOrWhiteSpace())
{
this.logger.LogInformation($"Cannot import Greengrass device '{thing.ThingName}' since it doesn't have related thing type.");
continue;
}

//EdgeDeviceModel not find in DB
var edgeDeviceModel = await this.edgeDeviceModelRepository.GetByNameAsync(thing.ThingTypeName);
if (edgeDeviceModel == null)
{
this.logger.LogWarning($"Cannot import Greengrass device '{thing.ThingName}'. The EdgeDeviceModel '{thing.ThingTypeName}' doesn't exist");
continue;
}

//Map with EdgeDevice
var edgeDevice = this.mapper.Map<EdgeDevice>(thing);
edgeDevice.DeviceModelId = edgeDeviceModel.Id;
//EdgeDevices properties that are not present in the thing
try
{
var modules = await this.configService.GetConfigModuleList(edgeDevice.DeviceModelId);
edgeDevice.NbDevices = await this.awsExternalDevicesService.GetEdgeDeviceNbDevices(this.mapper.Map<IoTEdgeDevice>(edgeDevice));
edgeDevice.NbModules = modules.Count;
var coreDevice = await amazonGreenGrass.GetCoreDeviceAsync(new GetCoreDeviceRequest() { CoreDeviceThingName = thing.ThingName });
if (coreDevice.HttpStatusCode != HttpStatusCode.OK)
{
this.logger.LogWarning($"Cannot import Greengrass device '{thing.ThingName}' due to an error retrieving core device in the Amazon IoT Data API : {coreDevice.HttpStatusCode}");
continue;
}
edgeDevice.ConnectionState = coreDevice.Status == CoreDeviceStatus.HEALTHY ? "Connected" : "Disconnected";
}
catch (Exception e)
{
this.logger.LogWarning($"Cannot import Greengrass device '{thing.ThingName}' due to an error retrieving Greengrass device properties in the Amazon IoT Data API.", e);
continue;
}
Comment on lines +126 to +130

Check notice

Code scanning / CodeQL

Generic catch clause

Generic catch clause.

//Create or update the Edge Device
await CreateOrUpdateGreenGrassDevice(edgeDevice);
}

foreach (var item in (await this.edgeDeviceRepository.GetAllAsync(
edgeDevice => !things.Select(x => x.ThingId).Contains(edgeDevice.Id),
default,
d => d.Tags,
d => d.Labels
)))
{
this.edgeDeviceRepository.Delete(item.Id);
}

await this.unitOfWork.SaveAsync();
}

private async Task<List<DescribeThingResponse>> GetAllGreenGrassDevices()
{
var devices = new List<DescribeThingResponse>();

var nextToken = string.Empty;

var response = await amazonGreenGrass.ListCoreDevicesAsync(
new ListCoreDevicesRequest
{
NextToken = nextToken
});

foreach (var requestDescribeThing in response.CoreDevices.Select(device => new DescribeThingRequest { ThingName = device.CoreDeviceThingName }))
kbeaugrand marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
devices.Add(await this.amazonIoTClient.DescribeThingAsync(requestDescribeThing));
}
catch (AmazonIoTException e)
{
this.logger.LogWarning($"Cannot import Greengrass device '{requestDescribeThing.ThingName}' due to an error in the Amazon IoT API.", e);
continue;
}
}

return devices;
}

private async Task CreateOrUpdateGreenGrassDevice(EdgeDevice edgeDevice)
{
var edgeDeviceEntity = await this.edgeDeviceRepository.GetByIdAsync(edgeDevice.Id, d => d.Tags);

if (edgeDeviceEntity == null)
{
await this.edgeDeviceRepository.InsertAsync(edgeDevice);
}
else
{
if (edgeDeviceEntity.Version >= edgeDevice.Version) return;

foreach (var deviceTagEntity in edgeDeviceEntity.Tags)
{
this.deviceTagValueRepository.Delete(deviceTagEntity.Id);
}

_ = this.mapper.Map(edgeDevice, edgeDeviceEntity);
this.edgeDeviceRepository.Update(edgeDeviceEntity);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ private async Task SyncThingsAsDevices()
continue;
}

//ThingType not find in DB
var deviceModel = this.deviceModelRepository.GetByName(thing.ThingTypeName);
//DeviceModel not find in DB
var deviceModel = await this.deviceModelRepository.GetByNameAsync(thing.ThingTypeName);
if (deviceModel == null)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}'. The ThingType '{thing.ThingTypeName}' doesn't exist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ namespace AzureIoTHub.Portal.Infrastructure.Repositories
{
using AzureIoTHub.Portal.Domain.Repositories;
using Domain.Entities;
using Microsoft.EntityFrameworkCore;

public class DeviceModelRepository : GenericRepository<DeviceModel>, IDeviceModelRepository
{
public DeviceModelRepository(PortalDbContext context) : base(context)
{
}

public DeviceModel? GetByName(string modelName)
public async Task<DeviceModel?> GetByNameAsync(string modelName)
{
return GetAll().FirstOrDefault(model => model.Name.Equals(modelName, StringComparison.Ordinal));
return await this.context.Set<DeviceModel>()
.FirstOrDefaultAsync(deviceModel => deviceModel.Name == modelName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ namespace AzureIoTHub.Portal.Infrastructure.Repositories
{
using Domain.Entities;
using Domain.Repositories;
using Microsoft.EntityFrameworkCore;

public class EdgeDeviceModelRepository : GenericRepository<EdgeDeviceModel>, IEdgeDeviceModelRepository
{
public EdgeDeviceModelRepository(PortalDbContext context) : base(context)
{
}

public async Task<EdgeDeviceModel?> GetByNameAsync(string edgeModelDevice)
{
return await this.context.Set<EdgeDeviceModel>()
.FirstOrDefaultAsync(edgeDeviceModel => edgeDeviceModel.Name == edgeModelDevice);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace AzureIoTHub.Portal.Infrastructure.Services
{
using System;
using System.Threading.Tasks;
using Amazon.GreengrassV2;
using Amazon.IoT.Model;
using AutoMapper;
using AzureIoTHub.Portal.Application.Managers;
Expand Down Expand Up @@ -137,6 +138,7 @@ public async Task<IoTEdgeDevice> GetEdgeDevice(string edgeDeviceId)
deviceDto.Modules = await this.configService.GetConfigModuleList(model.ExternalIdentifier!);
deviceDto.NbDevices = await this.awsExternalDevicesService.GetEdgeDeviceNbDevices(deviceDto);
deviceDto.NbModules = deviceDto.Modules.Count;
deviceDto.ConnectionState = deviceDto.RuntimeResponse == CoreDeviceStatus.HEALTHY ? "Connected" : "Disconnected";

return deviceDto;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ private static IServiceCollection ConfigureAWSSyncJobs(this IServiceCollection s
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));

_ = q.AddJob<SyncGreenGrassDevicesJob>(j => j.WithIdentity(nameof(SyncGreenGrassDevicesJob)))
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncGreenGrassDevicesJob)}")
.ForJob(nameof(SyncGreenGrassDevicesJob))
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));
});
}

Expand Down
Loading