Skip to content

Commit

Permalink
Update GeothermalResearchInstitute.ServerConsole to Grpc V2 protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
hcoona committed Dec 9, 2019
1 parent 0e0fc2c commit 3a0f7d1
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected virtual void Dispose(bool disposing)
[SuppressMessage("样式", "IDE0060:删除未使用的参数", Justification = "Required for callback delegate.")]
private void HearbeatEntryPoint(object state)
{
foreach (var entry in this.devices)
foreach (DeviceOptionsEntry entry in this.devices)
{
// TODO(zhangshuai.ustc): Implement it.
// 1. Get corresponding grpc client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="3.0.0" DevelopmentDependency="true" PrivateAssets="All" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="3.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Ini" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Configuration" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="3.1.0" DevelopmentDependency="true" PrivateAssets="All" />
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="3.1.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.CommandLine" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Ini" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Configuration" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Task StartAsync(CancellationToken cancellationToken)
{
this.logger.LogInformation(
"Grpc services are listening on {}",
string.Join(",", this.server.Ports.Select(p => $"{p.Host}:{p.Port}")));
string.Join(",", this.server.Ports.Select(p => $"{p.Host}:{p.BoundPort}")));
}

return Task.CompletedTask;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@
using System.Collections.Concurrent;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using GeothermalResearchInstitute.ServerConsole.Models;
using GeothermalResearchInstitute.ServerConsole.Utils;
using GeothermalResearchInstitute.v1;
using GeothermalResearchInstitute.v2;
using Google.Protobuf;
using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using static Google.Protobuf.WellKnownTypes.FieldMask;
using GrpcDevice = GeothermalResearchInstitute.v1.Device;
using GrpcDeviceMetrics = GeothermalResearchInstitute.v1.DeviceMetrics;
using ModelDeviceMetrics = GeothermalResearchInstitute.ServerConsole.Models.DeviceMetrics;
using GrpcDevice = GeothermalResearchInstitute.v2.Device;
using GrpcDeviceMetrics = GeothermalResearchInstitute.v2.Metric;

namespace GeothermalResearchInstitute.ServerConsole.GrpcServices
{
Expand All @@ -47,221 +43,54 @@ public DeviceServiceImpl(

if (this.logger.IsEnabled(LogLevel.Debug))
{
var deviceOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<DeviceOptions>>();
foreach (var d in deviceOptions.Value.Devices)
IOptionsSnapshot<DeviceOptions> deviceOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<DeviceOptions>>();
foreach (DeviceOptionsEntry d in deviceOptions.Value.Devices)
{
this.logger.LogDebug(
"{0}={1}",
string.Join(string.Empty, d.ComputeIdBinary().Select(b => b.ToString("X2", CultureInfo.InvariantCulture))),
d.Name);
}
}
}

public override Task<ListDevicesResponse> ListDevices(ListDevicesRequest request, ServerCallContext context)
{
var deviceOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<DeviceOptions>>();
var response = new ListDevicesResponse();
response.Devices.Add(deviceOptions.Value.Devices.Select(d => new GrpcDevice
{
Id = ByteString.CopyFrom(d.ComputeIdBinary()),
Name = d.Name,
}));
return Task.FromResult(response);
}

public override Task<GrpcDevice> GetDevice(GetDeviceRequest request, ServerCallContext context)
{
var deviceOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<DeviceOptions>>();
var deviceBasicInformation = deviceOptions.Value.Devices.SingleOrDefault(d => d.ComputeIdBinary().SequenceEqual(request.Id));
if (deviceBasicInformation == null)
{
throw new RpcException(new Status(StatusCode.NotFound, "Device Id is not configured."));
}

var deviceAdditionalInformation = this.bjdireContext.DevicesActualStates.Find(new object[]
{
request.Id.ToByteArray(),
});
if (deviceAdditionalInformation == null)
{
deviceAdditionalInformation = new DeviceActualStates();
}

var device = new GrpcDevice
{
Id = request.Id,
};
switch (request.View)
{
case DeviceView.NameOnly:
device.AssignNameFrom(deviceBasicInformation);
break;
case DeviceView.WorkingModeOnly:
device.AssignWorkingModeFrom(deviceAdditionalInformation);
break;
case DeviceView.DeviceOptionOnly:
device.AssignOptionFrom(deviceAdditionalInformation);
break;
case DeviceView.MetricsAndControl:
if (this.metricsMap.TryGetValue(request.Id, out var metrics))
{
device.Metrics = new GrpcDeviceMetrics(metrics);
}
else
{
device.Metrics = new GrpcDeviceMetrics();
}

device.AssignControlsFrom(deviceAdditionalInformation);
break;
default:
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"Invalid DeviceView: " + (int)request.View));
}

return Task.FromResult(device);
}

public override Task<GrpcDevice> UpdateDevice(UpdateDeviceRequest request, ServerCallContext context)
{
var deviceOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<DeviceOptions>>();
var deviceBasicInformation = deviceOptions.Value.Devices.SingleOrDefault(d => d.ComputeIdBinary().SequenceEqual(request.Device.Id));
if (deviceBasicInformation == null)
{
throw new RpcException(new Status(StatusCode.NotFound, "Device Id is not configured."));
}

var deviceStates = this.bjdireContext.DevicesDesiredStates.Find(new object[]
{
request.Device.Id.ToByteArray(),
});
if (deviceStates == null)
{
deviceStates = new DeviceDesiredStates
IOptionsSnapshot<AuthenticationOptions> authenticationOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<AuthenticationOptions>>();
foreach (Credential c in authenticationOptions.Value.Credentials)
{
Id = request.Device.Id.ToByteArray(),
};
this.bjdireContext.DevicesDesiredStates.Add(deviceStates);
}

if (request.UpdateMask.Paths.Count == 0)
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "Invalid update_mask."));
}

foreach (var path in request.UpdateMask.Paths)
{
switch (path)
{
case "working_mode":
request.Device.AssignWorkingModeTo(deviceStates);
break;
case "device_option":
request.Device.AssignOptionTo(deviceStates);
break;
case "controls":
request.Device.AssignControlsTo(deviceStates);
break;
default:
throw new RpcException(new Status(StatusCode.InvalidArgument, "Invalid update_mask."));
this.logger.LogDebug(c.ToString());
}
}

this.bjdireContext.SaveChanges();

var device = new GrpcDevice
{
Id = request.Device.Id,
};
request.UpdateMask.Merge(request.Device, device, new MergeOptions());

return Task.FromResult(device);
}

public override Task<HeartbeatResponse> Heartbeat(HeartbeatRequest request, ServerCallContext context)
public override Task<AuthenticateResponse> Authenticate(
AuthenticateRequest request, ServerCallContext context)
{
if (request.Device.Ipv4Address.IsEmpty)
IOptionsSnapshot<AuthenticationOptions> authenticationOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<AuthenticationOptions>>();
Credential credential = authenticationOptions.Value.Credentials.SingleOrDefault(
c => string.Equals(c.Username, request.Username, StringComparison.Ordinal) &&
string.Equals(c.Password, request.Password, StringComparison.Ordinal));
if (credential == null)
{
request.Device.Ipv4Address = ByteString.CopyFrom(IPAddress.Parse(context.Peer).MapToIPv4().GetAddressBytes());
throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid username or password."));
}

var deviceOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<DeviceOptions>>();
var deviceBasicInformation = deviceOptions.Value.Devices.SingleOrDefault(d => d.ComputeIdBinary().SequenceEqual(request.Device.Id));
if (deviceBasicInformation == null)
else
{
this.logger.LogWarning(
"Received heartbeat from a not configured device: mac={0}, ipv4={1}",
string.Join(string.Empty, request.Device.Id.Select(b => b.ToString("X2", CultureInfo.InvariantCulture))),
new IPAddress(request.Device.Ipv4Address.ToByteArray()).ToString());
throw new RpcException(new Status(StatusCode.NotFound, "Device Id is not configured."));
}

var actualStates = this.bjdireContext.DevicesActualStates.Find(new object[]
{
request.Device.Id.ToByteArray(),
});
if (actualStates == null)
{
actualStates = new DeviceActualStates
return Task.FromResult(new AuthenticateResponse()
{
Id = request.Device.Id.ToByteArray(),
};
this.bjdireContext.DevicesActualStates.Add(actualStates);
}

actualStates.IPAddress = request.Device.Ipv4Address.ToByteArray();
request.Device
.AssignWorkingModeTo(actualStates)
.AssignOptionFrom(actualStates)
.AssignControlsFrom(actualStates);

this.metricsMap.AddOrUpdate(request.Device.Id, _ => request.Device.Metrics, (_, __) => request.Device.Metrics);
foreach (var m in request.HistoryMetrics.Concat(new[] { request.Device.Metrics }))
{
var currentMetrics = this.bjdireContext.DevicesMetrics.Find(new
{
Id = request.Device.Id.ToByteArray(),
Timestamp = request.Device.Metrics.UpdateTimestamp.ToDateTimeOffset(),
Nickname = credential.Nickname,
Role = credential.Role,
});
if (currentMetrics == null)
{
currentMetrics = new ModelDeviceMetrics
{
Id = request.Device.Id.ToByteArray(),
};
this.bjdireContext.DevicesMetrics.Add(currentMetrics);
}

request.Device.Metrics.AssignTo(currentMetrics);
}
}

this.bjdireContext.SaveChanges();

var desiredStates = this.bjdireContext.DevicesDesiredStates.Find(new object[]
{
request.Device.Id.ToByteArray(),
});
if (actualStates == null)
{
desiredStates = new DeviceDesiredStates();
}

var device = new GrpcDevice
{
Id = request.Device.Id,
};

device
.AssignWorkingModeFrom(desiredStates)
.AssignOptionFrom(desiredStates)
.AssignControlsFrom(desiredStates);

return Task.FromResult(new HeartbeatResponse
public override Task<ListDevicesResponse> ListDevices(ListDevicesRequest request, ServerCallContext context)
{
IOptionsSnapshot<DeviceOptions> deviceOptions = this.serviceProvider.GetRequiredService<IOptionsSnapshot<DeviceOptions>>();
var response = new ListDevicesResponse();
response.Devices.Add(deviceOptions.Value.Devices.Select(d => new GrpcDevice
{
Device = device,
});
Id = ByteString.CopyFrom(d.ComputeIdBinary()),
Name = d.Name,
}));
return Task.FromResult(response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Licensed under the GPLv3 license. See LICENSE file in the project root for full license information.
// </copyright>

using GeothermalResearchInstitute.v1;
using GeothermalResearchInstitute.v2;

namespace GeothermalResearchInstitute.ServerConsole.Models
{
Expand Down
Loading

0 comments on commit 3a0f7d1

Please sign in to comment.