Skip to content

Commit

Permalink
Sync AWS Things with portal (#2133)
Browse files Browse the repository at this point in the history
* #1924 : Sync Things Device

- Sync Things Job
- Add Job Quartz
- TU Sync Things Job

* #1924 Sync Iot Things

- Fix for deletion at sync
- Remove unused references

* #1924 Fix TU

* #1924 Fix code scan missed opportunity to use Select

* #1924 Add TU and update log message type

* #1924 Amazon Exception Handling

* #1924 Fix Throw Exception
  • Loading branch information
delager authored and kbeaugrand committed Jun 16, 2023
1 parent 79294ae commit 2be997c
Show file tree
Hide file tree
Showing 5 changed files with 813 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public AWSDeviceThingProfile()
.ForMember(dest => dest.ThingName, opts => opts.MapFrom(src => src.DeviceName))
.ForMember(dest => dest.Payload, opts => opts.MapFrom(src => EmptyPayload()))
.ReverseMap();

_ = CreateMap<DescribeThingResponse, Device>()
.ForMember(dest => dest.Id, opts => opts.MapFrom(src => src.ThingId))
.ForMember(dest => dest.Name, opts => opts.MapFrom(src => src.ThingName))
.ForMember(dest => dest.Version, opts => opts.MapFrom(src => src.Version))
.ForMember(dest => dest.Tags, opts => opts.MapFrom(src => src.Attributes.Select(att => new DeviceTagValue
{
Name = att.Key,
Value = att.Value
})));
}

private static MemoryStream EmptyPayload()
Expand Down
3 changes: 2 additions & 1 deletion src/AzureIoTHub.Portal.Domain/Entities/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace AzureIoTHub.Portal.Domain.Entities
{
using System.Collections.ObjectModel;
using AzureIoTHub.Portal.Domain.Base;

public class Device : EntityBase
Expand Down Expand Up @@ -50,6 +51,6 @@ public class Device : EntityBase
/// <summary>
/// List of custom device tags and their values.
/// </summary>
public ICollection<DeviceTagValue> Tags { get; set; } = default!;
public ICollection<DeviceTagValue> Tags { get; set; } = new Collection<DeviceTagValue>();
}
}
185 changes: 185 additions & 0 deletions src/AzureIoTHub.Portal.Infrastructure/Jobs/AWS/SyncThingsJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) CGI France. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace AzureIoTHub.Portal.Infrastructure.Jobs.AWS
{
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Amazon.IoT;
using Amazon.IoT.Model;
using Amazon.IotData;
using Amazon.IotData.Model;
using AutoMapper;
using AzureIoTHub.Portal.Domain;
using AzureIoTHub.Portal.Domain.Entities;
using AzureIoTHub.Portal.Domain.Repositories;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Util;

[DisallowConcurrentExecution]
public class SyncThingsJob : IJob
{

private readonly ILogger<SyncThingsJob> logger;
private readonly IMapper mapper;
private readonly IUnitOfWork unitOfWork;
private readonly IDeviceRepository deviceRepository;
private readonly IDeviceModelRepository deviceModelRepository;
private readonly IDeviceTagValueRepository deviceTagValueRepository;
private readonly IAmazonIoT amazonIoTClient;
private readonly IAmazonIotData amazonIoTDataClient;

public SyncThingsJob(
ILogger<SyncThingsJob> logger,
IMapper mapper,
IUnitOfWork unitOfWork,
IDeviceRepository deviceRepository,
IDeviceModelRepository deviceModelRepository,
IDeviceTagValueRepository deviceTagValueRepository,
IAmazonIoT amazonIoTClient,
IAmazonIotData amazonIoTDataClient)
{
this.mapper = mapper;
this.unitOfWork = unitOfWork;
this.deviceRepository = deviceRepository;
this.deviceModelRepository = deviceModelRepository;
this.deviceTagValueRepository = deviceTagValueRepository;
this.amazonIoTClient = amazonIoTClient;
this.amazonIoTDataClient = amazonIoTDataClient;
this.logger = logger;
}


public async Task Execute(IJobExecutionContext context)
{
try
{
this.logger.LogInformation("Start of sync Things job");

await SyncThingsAsDevices();

this.logger.LogInformation("End of sync Things job");
}
catch (Exception e)
{
this.logger.LogError(e, "Sync Things job has failed");
}
}

private async Task SyncThingsAsDevices()
{
var things = await GetAllThings();

foreach (var thing in things)
{
//Thing error
if (thing.HttpStatusCode != HttpStatusCode.OK)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error in the Amazon IoT API : {thing.HttpStatusCode}");
continue;
}

//ThingType not specified
if (thing.ThingTypeName.IsNullOrWhiteSpace())
{
this.logger.LogInformation($"Cannot import device '{thing.ThingName}' since it doesn't have related thing type.");
continue;
}

//ThingType not find in DB
var deviceModel = this.deviceModelRepository.GetByName(thing.ThingTypeName);
if (deviceModel == null)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}'. The ThingType '{thing.ThingTypeName}' doesn't exist");
continue;
}

//ThingShadow not specified
var thingShadowRequest = new GetThingShadowRequest()
{
ThingName = thing.ThingName
};
try
{
var thingShadow = await this.amazonIoTDataClient.GetThingShadowAsync(thingShadowRequest);
if (thingShadow.HttpStatusCode != HttpStatusCode.OK)
{
if (thingShadow.HttpStatusCode.Equals(HttpStatusCode.NotFound))
this.logger.LogInformation($"Cannot import device '{thing.ThingName}' since it doesn't have related classic thing shadow");
else
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error retrieving thing shadow in the Amazon IoT API : {thingShadow.HttpStatusCode}");
continue;
}
}
catch (AmazonIotDataException e)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error retrieving thing shadow in the Amazon IoT Data API.", e);
continue;
}

//Create or update the thing
await CreateOrUpdateThing(thing, deviceModel);
}

foreach (var item in (await this.deviceRepository.GetAllAsync(
device => !things.Select(x => x.ThingId).Contains(device.Id),
default,
d => d.Tags,
d => d.Labels
)))
{
this.deviceRepository.Delete(item.Id);
}

await this.unitOfWork.SaveAsync();
}

private async Task<List<DescribeThingResponse>> GetAllThings()
{
var things = new List<DescribeThingResponse>();

var response = await amazonIoTClient.ListThingsAsync();

foreach (var requestDescribeThing in response.Things.Select(thing => new DescribeThingRequest { ThingName = thing.ThingName }))
{
try
{
things.Add(await this.amazonIoTClient.DescribeThingAsync(requestDescribeThing));
}
catch (AmazonIoTException e)
{
this.logger.LogWarning($"Cannot import device '{requestDescribeThing.ThingName}' due to an error in the Amazon IoT API.", e);
continue;
}
}

return things;
}

private async Task CreateOrUpdateThing(DescribeThingResponse thing, DeviceModel deviceModel)
{
var device = this.mapper.Map<Device>(thing);
var deviceEntity = await this.deviceRepository.GetByIdAsync(device.Id, d => d.Tags);
device.DeviceModelId = deviceModel.Id;

if (deviceEntity == null)
{
await this.deviceRepository.InsertAsync(device);
}
else
{
if (deviceEntity.Version >= device.Version) return;

foreach (var deviceTagEntity in deviceEntity.Tags)
{
this.deviceTagValueRepository.Delete(deviceTagEntity.Id);
}

_ = this.mapper.Map(device, deviceEntity);
this.deviceRepository.Update(deviceEntity);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,15 @@ private static IServiceCollection ConfigureAWSSyncJobs(this IServiceCollection s
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncThingTypesJob)}")
.ForJob(nameof(SyncThingTypesJob))
.WithSimpleSchedule(s => s
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));

_ = q.AddJob<SyncThingsJob>(j => j.WithIdentity(nameof(SyncThingsJob)))
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncThingsJob)}")
.ForJob(nameof(SyncThingsJob))
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));

Expand Down
Loading

0 comments on commit 2be997c

Please sign in to comment.