Skip to content

Commit

Permalink
cherry pick Azure#6085
Browse files Browse the repository at this point in the history
  • Loading branch information
vipeller committed Feb 8, 2022
1 parent 960596c commit 59ec9a0
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static async Task<EdgeHubCertificates> LoadAsync(IConfigurationRoot confi
string edgeletApiVersion = configuration.GetValue<string>(Constants.ConfigKey.WorkloadAPiVersion);
DateTime expiration = DateTime.UtcNow.AddDays(Constants.CertificateValidityDays);

certificates = await CertificateHelper.GetServerCertificatesFromEdgelet(workloadUri, edgeletApiVersion, Constants.WorkloadApiVersion, moduleId, generationId, edgeHubHostname, expiration);
certificates = await CertificateHelper.GetServerCertificatesFromEdgelet(workloadUri, edgeletApiVersion, Constants.WorkloadApiVersion, moduleId, generationId, edgeHubHostname, expiration, logger);
IEnumerable<X509Certificate2> trustBundle = await CertificateHelper.GetTrustBundleFromEdgelet(workloadUri, edgeletApiVersion, Constants.WorkloadApiVersion, moduleId, generationId);

result = new EdgeHubCertificates(
Expand All @@ -66,7 +66,7 @@ public static async Task<EdgeHubCertificates> LoadAsync(IConfigurationRoot confi
// If no connection string was set and we use iotedged workload style certificates for development
(X509Certificate2 ServerCertificate, IEnumerable<X509Certificate2> CertificateChain) certificates;

certificates = CertificateHelper.GetServerCertificateAndChainFromFile(edgeHubDevCertPath, edgeHubDevPrivateKeyPath);
certificates = CertificateHelper.GetServerCertificateAndChainFromFile(edgeHubDevCertPath, edgeHubDevPrivateKeyPath, logger);
IEnumerable<X509Certificate2> trustBundle = CertificateHelper.ParseTrustedBundleFromFile(edgeHubDevTrustBundlePath);

result = new EdgeHubCertificates(
Expand Down
179 changes: 94 additions & 85 deletions edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,99 +54,108 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)

ILogger logger = Logger.Factory.CreateLogger("EdgeHub");

EdgeHubCertificates certificates = await EdgeHubCertificates.LoadAsync(configuration, logger);
bool clientCertAuthEnabled = configuration.GetValue(Constants.ConfigKey.EdgeHubClientCertAuthEnabled, false);

string sslProtocolsConfig = configuration.GetValue(Constants.ConfigKey.SslProtocols, string.Empty);
SslProtocols sslProtocols = SslProtocolsHelper.Parse(sslProtocolsConfig, DefaultSslProtocols, logger);
logger.LogInformation($"Enabling SSL protocols: {sslProtocols.Print()}");

IDependencyManager dependencyManager = new DependencyManager(configuration, certificates.ServerCertificate, certificates.TrustBundle, sslProtocols);
Hosting hosting = Hosting.Initialize(configuration, certificates.ServerCertificate, dependencyManager, clientCertAuthEnabled, sslProtocols);
IContainer container = hosting.Container;

logger.LogInformation("Initializing Edge Hub");
LogLogo(logger);
LogVersionInfo(logger);
logger.LogInformation($"OptimizeForPerformance={configuration.GetValue("OptimizeForPerformance", true)}");
logger.LogInformation($"MessageAckTimeoutSecs={configuration.GetValue("MessageAckTimeoutSecs", 30)}");
logger.LogInformation("Loaded server certificate with expiration date of {0}", certificates.ServerCertificate.NotAfter.ToString("o"));

var metricsProvider = container.Resolve<IMetricsProvider>();
Metrics.InitWithAspNet(metricsProvider, logger); // Note this requires App.UseMetricServer() to be called in Startup.cs

// EdgeHub and CloudConnectionProvider have a circular dependency. So need to Bind the EdgeHub to the CloudConnectionProvider.
IEdgeHub edgeHub = await container.Resolve<Task<IEdgeHub>>();
ICloudConnectionProvider cloudConnectionProvider = await container.Resolve<Task<ICloudConnectionProvider>>();
cloudConnectionProvider.BindEdgeHub(edgeHub);

// EdgeHub cloud proxy and DeviceConnectivityManager have a circular dependency,
// so the cloud proxy has to be set on the DeviceConnectivityManager after both have been initialized.
var deviceConnectivityManager = container.Resolve<IDeviceConnectivityManager>();
IConnectionManager connectionManager = await container.Resolve<Task<IConnectionManager>>();
(deviceConnectivityManager as DeviceConnectivityManager)?.SetConnectionManager(connectionManager);

// Register EdgeHub credentials
var edgeHubCredentials = container.ResolveNamed<IClientCredentials>("EdgeHubCredentials");
ICredentialsCache credentialsCache = await container.Resolve<Task<ICredentialsCache>>();
await credentialsCache.Add(edgeHubCredentials);

// Register EdgeHub indentity in device scopes cache.
// When we connect upstream, we verify that identity is in scope.
// On a fresh start, we may not yet received the scopes from the upstream, so we need
// to force add edgeHub in the cache so it is able to connect upstream.
// Once we get the scopes from the upstream, this record is replaced.
ServiceIdentity edgeHubIdentity = container.ResolveNamed<ServiceIdentity>("EdgeHubIdentity");
IServiceIdentityHierarchy identityScopes = container.Resolve<IServiceIdentityHierarchy>();
await identityScopes.AddOrUpdate(edgeHubIdentity);

// Initializing configuration
logger.LogInformation("Initializing configuration");
IConfigSource configSource = await container.Resolve<Task<IConfigSource>>();
ConfigUpdater configUpdater = await container.Resolve<Task<ConfigUpdater>>();
ExperimentalFeatures experimentalFeatures = CreateExperimentalFeatures(configuration);
var configUpdaterStartupFailed = new TaskCompletionSource<bool>();
var configDownloadTask = configUpdater.Init(configSource);

_ = configDownloadTask.ContinueWith(
_ => configUpdaterStartupFailed.SetResult(false),
TaskContinuationOptions.OnlyOnFaulted);

if (!Enum.TryParse(configuration.GetValue("AuthenticationMode", string.Empty), true, out AuthenticationMode authenticationMode)
|| authenticationMode != AuthenticationMode.Cloud)
try
{
ConnectionReauthenticator connectionReauthenticator = await container.Resolve<Task<ConnectionReauthenticator>>();
connectionReauthenticator.Init();
}

TimeSpan shutdownWaitPeriod = TimeSpan.FromSeconds(configuration.GetValue("ShutdownWaitPeriod", DefaultShutdownWaitPeriod));
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(shutdownWaitPeriod, logger);

using (IProtocolHead mqttBrokerProtocolHead = await GetMqttBrokerProtocolHeadAsync(experimentalFeatures, container))
using (IProtocolHead edgeHubProtocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, experimentalFeatures, container, hosting))
using (var renewal = new CertificateRenewal(certificates, logger))
{
try
EdgeHubCertificates certificates = await EdgeHubCertificates.LoadAsync(configuration, logger);
bool clientCertAuthEnabled = configuration.GetValue(Constants.ConfigKey.EdgeHubClientCertAuthEnabled, false);

string sslProtocolsConfig = configuration.GetValue(Constants.ConfigKey.SslProtocols, string.Empty);
SslProtocols sslProtocols = SslProtocolsHelper.Parse(sslProtocolsConfig, DefaultSslProtocols, logger);
logger.LogInformation($"Enabling SSL protocols: {sslProtocols.Print()}");

IDependencyManager dependencyManager = new DependencyManager(configuration, certificates.ServerCertificate, certificates.TrustBundle, sslProtocols);
Hosting hosting = Hosting.Initialize(configuration, certificates.ServerCertificate, dependencyManager, clientCertAuthEnabled, sslProtocols);
IContainer container = hosting.Container;

logger.LogInformation("Initializing Edge Hub");
LogLogo(logger);
LogVersionInfo(logger);
logger.LogInformation($"OptimizeForPerformance={configuration.GetValue("OptimizeForPerformance", true)}");
logger.LogInformation($"MessageAckTimeoutSecs={configuration.GetValue("MessageAckTimeoutSecs", 30)}");
logger.LogInformation("Loaded server certificate with expiration date of {0}", certificates.ServerCertificate.NotAfter.ToString("o"));

var metricsProvider = container.Resolve<IMetricsProvider>();
Metrics.InitWithAspNet(metricsProvider, logger); // Note this requires App.UseMetricServer() to be called in Startup.cs

// EdgeHub and CloudConnectionProvider have a circular dependency. So need to Bind the EdgeHub to the CloudConnectionProvider.
IEdgeHub edgeHub = await container.Resolve<Task<IEdgeHub>>();
ICloudConnectionProvider cloudConnectionProvider = await container.Resolve<Task<ICloudConnectionProvider>>();
cloudConnectionProvider.BindEdgeHub(edgeHub);

// EdgeHub cloud proxy and DeviceConnectivityManager have a circular dependency,
// so the cloud proxy has to be set on the DeviceConnectivityManager after both have been initialized.
var deviceConnectivityManager = container.Resolve<IDeviceConnectivityManager>();
IConnectionManager connectionManager = await container.Resolve<Task<IConnectionManager>>();
(deviceConnectivityManager as DeviceConnectivityManager)?.SetConnectionManager(connectionManager);

// Register EdgeHub credentials
var edgeHubCredentials = container.ResolveNamed<IClientCredentials>("EdgeHubCredentials");
ICredentialsCache credentialsCache = await container.Resolve<Task<ICredentialsCache>>();
await credentialsCache.Add(edgeHubCredentials);

// Register EdgeHub indentity in device scopes cache.
// When we connect upstream, we verify that identity is in scope.
// On a fresh start, we may not yet received the scopes from the upstream, so we need
// to force add edgeHub in the cache so it is able to connect upstream.
// Once we get the scopes from the upstream, this record is replaced.
ServiceIdentity edgeHubIdentity = container.ResolveNamed<ServiceIdentity>("EdgeHubIdentity");
IServiceIdentityHierarchy identityScopes = container.Resolve<IServiceIdentityHierarchy>();
await identityScopes.AddOrUpdate(edgeHubIdentity);

// Initializing configuration
logger.LogInformation("Initializing configuration");
IConfigSource configSource = await container.Resolve<Task<IConfigSource>>();
ConfigUpdater configUpdater = await container.Resolve<Task<ConfigUpdater>>();
ExperimentalFeatures experimentalFeatures = CreateExperimentalFeatures(configuration);
var configUpdaterStartupFailed = new TaskCompletionSource<bool>();
var configDownloadTask = configUpdater.Init(configSource);

_ = configDownloadTask.ContinueWith(
_ => configUpdaterStartupFailed.SetResult(false),
TaskContinuationOptions.OnlyOnFaulted);

if (!Enum.TryParse(configuration.GetValue("AuthenticationMode", string.Empty), true, out AuthenticationMode authenticationMode)
|| authenticationMode != AuthenticationMode.Cloud)
{
await Task.WhenAll(mqttBrokerProtocolHead.StartAsync(), configDownloadTask);
await edgeHubProtocolHead.StartAsync();
await Task.WhenAny(cts.Token.WhenCanceled(), renewal.Token.WhenCanceled(), configUpdaterStartupFailed.Task);
ConnectionReauthenticator connectionReauthenticator = await container.Resolve<Task<ConnectionReauthenticator>>();
connectionReauthenticator.Init();
}
catch (Exception ex)

TimeSpan shutdownWaitPeriod = TimeSpan.FromSeconds(configuration.GetValue("ShutdownWaitPeriod", DefaultShutdownWaitPeriod));
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(shutdownWaitPeriod, logger);

using (IProtocolHead mqttBrokerProtocolHead = await GetMqttBrokerProtocolHeadAsync(experimentalFeatures, container))
using (IProtocolHead edgeHubProtocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, experimentalFeatures, container, hosting))
using (var renewal = new CertificateRenewal(certificates, logger))
{
logger.LogError($"Error starting protocol heads: {ex.Message}");
try
{
await Task.WhenAll(mqttBrokerProtocolHead.StartAsync(), configDownloadTask);
await edgeHubProtocolHead.StartAsync();
await Task.WhenAny(cts.Token.WhenCanceled(), renewal.Token.WhenCanceled(), configUpdaterStartupFailed.Task);
}
catch (Exception ex)
{
logger.LogError($"Error starting protocol heads: {ex.Message}");
}

logger.LogInformation("Stopping the protocol heads...");
await Task.WhenAll(mqttBrokerProtocolHead.CloseAsync(CancellationToken.None), edgeHubProtocolHead.CloseAsync(CancellationToken.None));
logger.LogInformation("Protocol heads stopped.");

await CloseDbStoreProviderAsync(container);
}

logger.LogInformation("Stopping the protocol heads...");
await Task.WhenAll(mqttBrokerProtocolHead.CloseAsync(CancellationToken.None), edgeHubProtocolHead.CloseAsync(CancellationToken.None));
logger.LogInformation("Protocol heads stopped.");

await CloseDbStoreProviderAsync(container);
completed.Set();
handler.ForEach(h => GC.KeepAlive(h));
logger.LogInformation("Shutdown complete.");
}
catch (Exception ex)
{
logger.LogError(ex, "Stopping with exception");
throw;
}

completed.Set();
handler.ForEach(h => GC.KeepAlive(h));
logger.LogInformation("Shutdown complete.");
return 0;
}

Expand Down
Loading

0 comments on commit 59ec9a0

Please sign in to comment.