Skip to content

Commit

Permalink
#1924 : Sync Things Device
Browse files Browse the repository at this point in the history
- Sync Things Job
- Add Job Quartz
- TU Sync Things Job
  • Loading branch information
delager committed May 30, 2023
1 parent bd9e43c commit e41cf8e
Show file tree
Hide file tree
Showing 5 changed files with 466 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public AWSDeviceThingProfile()
.ForMember(dest => dest.ThingName, opts => opts.MapFrom(src => src.DeviceName))
.ForMember(dest => dest.Payload, opts => opts.MapFrom(src => EmptyPayload()))
.ReverseMap();

_ = CreateMap<DescribeThingResponse, Device>()
.ForMember(dest => dest.Id, opts => opts.MapFrom(src => src.ThingId))
.ForMember(dest => dest.Name, opts => opts.MapFrom(src => src.ThingName))
.ForMember(dest => dest.Version, opts => opts.MapFrom(src => src.Version))
.ForMember(dest => dest.Tags, opts => opts.MapFrom(src => src.Attributes.Select(att => new DeviceTagValue
{
Name = att.Key,
Value = att.Value
})));
}

private static MemoryStream EmptyPayload()
Expand Down
3 changes: 2 additions & 1 deletion src/AzureIoTHub.Portal.Domain/Entities/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace AzureIoTHub.Portal.Domain.Entities
{
using System.Collections.ObjectModel;
using AzureIoTHub.Portal.Domain.Base;

public class Device : EntityBase
Expand Down Expand Up @@ -50,6 +51,6 @@ public class Device : EntityBase
/// <summary>
/// List of custom device tags and their values.
/// </summary>
public ICollection<DeviceTagValue> Tags { get; set; } = default!;
public ICollection<DeviceTagValue> Tags { get; set; } = new Collection<DeviceTagValue>();
}
}
159 changes: 159 additions & 0 deletions src/AzureIoTHub.Portal.Infrastructure/Jobs/AWS/SyncThingsJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) CGI France. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace AzureIoTHub.Portal.Infrastructure.Jobs.AWS
{
using System.Net;
using System.Threading.Tasks;
using Amazon.IoT;
using Amazon.IoT.Model;
using Amazon.IotData;
using Amazon.IotData.Model;
using AutoMapper;
using AzureIoTHub.Portal.Domain;
using AzureIoTHub.Portal.Domain.Entities;
using AzureIoTHub.Portal.Domain.Repositories;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Util;

[DisallowConcurrentExecution]
public class SyncThingsJob : IJob
{

private readonly ILogger<SyncThingsJob> logger;
private readonly IMapper mapper;
private readonly IUnitOfWork unitOfWork;
private readonly IDeviceRepository deviceRepository;
private readonly IDeviceModelRepository deviceModelRepository;
private readonly IDeviceTagValueRepository deviceTagValueRepository;
private readonly IAmazonIoT amazonIoTClient;
private readonly IAmazonIotData amazonIoTDataClient;

public SyncThingsJob(
ILogger<SyncThingsJob> logger,
IMapper mapper,
IUnitOfWork unitOfWork,
IDeviceRepository deviceRepository,
IDeviceModelRepository deviceModelRepository,
IDeviceTagValueRepository deviceTagValueRepository,
IAmazonIoT amazonIoTClient,
IAmazonIotData amazonIoTDataClient)
{
this.mapper = mapper;
this.unitOfWork = unitOfWork;
this.deviceRepository = deviceRepository;
this.deviceModelRepository = deviceModelRepository;
this.deviceTagValueRepository = deviceTagValueRepository;
this.amazonIoTClient = amazonIoTClient;
this.amazonIoTDataClient = amazonIoTDataClient;
this.logger = logger;
}


public async Task Execute(IJobExecutionContext context)
{
try
{
this.logger.LogInformation("Start of sync Thing Types job");

await SyncThingsAsDevices();

this.logger.LogInformation("End of sync Thing Types job");
}
catch (Exception e)
{
this.logger.LogError(e, "Sync Thing Types job has failed");
}
}

private async Task SyncThingsAsDevices()
{
var things = await GetAllThings();

foreach (var thing in things)
{
//ThingType not specified
if (thing.ThingTypeName.IsNullOrWhiteSpace())
{
this.logger.LogInformation($"Cannot import device '{thing.ThingName}' since it doesn't have related thing type.");
continue;
}

//ThingType not find in DB
var deviceModel = this.deviceModelRepository.GetByName(thing.ThingTypeName);
if (deviceModel == null)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}'. The ThingType '{thing.ThingTypeName}' doesn't exist");
continue;
}

//ThingShadow not specified
var thingShadowRequest = new GetThingShadowRequest()
{
ThingName = thing.ThingName
};
var thingShadow = await this.amazonIoTDataClient.GetThingShadowAsync(thingShadowRequest);
if (thingShadow.HttpStatusCode.Equals(HttpStatusCode.NotFound))
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' since it doesn't have related thing shadow");
continue;
}

//Create or update the thing
await CreateOrUpdateThing(thing, deviceModel);
}

foreach (var item in (await this.deviceRepository.GetAllAsync()).Where(device => !things.Exists(x => x.ThingId == device.Id)))
{
var deviceEntity = await this.deviceRepository.GetByIdAsync(item.Id, d => d.Tags, d => d.Labels);
this.deviceRepository.Delete(deviceEntity!.Id);
}

await this.unitOfWork.SaveAsync();
}

private async Task<List<DescribeThingResponse>> GetAllThings()
{
var things = new List<DescribeThingResponse>();

var response = await amazonIoTClient.ListThingsAsync();

foreach (var thing in response.Things)
{
var requestDescribeThing = new DescribeThingRequest
{
ThingName = thing.ThingName
};

things.Add(await this.amazonIoTClient.DescribeThingAsync(requestDescribeThing));
}

return things;
}

private async Task CreateOrUpdateThing(DescribeThingResponse thing, DeviceModel deviceModel)
{
var device = this.mapper.Map<Device>(thing);
var deviceEntity = await this.deviceRepository.GetByIdAsync(device.Id, d => d.Tags);
device.DeviceModelId = deviceModel.Id;

if (deviceEntity == null)
{
await this.deviceRepository.InsertAsync(device);
}
else
{
if (deviceEntity.Version >= device.Version) return;

foreach (var deviceTagEntity in deviceEntity.Tags)
{
this.deviceTagValueRepository.Delete(deviceTagEntity.Id);
}

_ = this.mapper.Map(device, deviceEntity);
this.deviceRepository.Update(deviceEntity);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace AzureIoTHub.Portal.Infrastructure.Startup
{
using System.Net;
using Amazon;
using Amazon.GreengrassV2;
using Amazon.IoT;
Expand All @@ -19,6 +20,7 @@ namespace AzureIoTHub.Portal.Infrastructure.Startup
using AzureIoTHub.Portal.Infrastructure.Services.AWS;
using AzureIoTHub.Portal.Models.v10;
using Microsoft.Extensions.DependencyInjection;
using Prometheus;
using Quartz;

public static class AWSServiceCollectionExtension
Expand Down Expand Up @@ -81,7 +83,15 @@ private static IServiceCollection ConfigureAWSSyncJobs(this IServiceCollection s
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncThingTypesJob)}")
.ForJob(nameof(SyncThingTypesJob))
.WithSimpleSchedule(s => s
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));

_ = q.AddJob<SyncThingsJob>(j => j.WithIdentity(nameof(SyncThingsJob)))
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncThingsJob)}")
.ForJob(nameof(SyncThingsJob))
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));
});
Expand Down
Loading

0 comments on commit e41cf8e

Please sign in to comment.