From 93515abcd699b55ce724e45d782911f40cd87a12 Mon Sep 17 00:00:00 2001 From: yophilav <54859653+yophilav@users.noreply.github.com> Date: Thu, 17 Dec 2020 17:28:58 -0800 Subject: [PATCH] EdgeHub awaits for Twin in non-MQTT broker scenario (#4084) TL;DR Currently the protocol head could start up before the route configuration is setup on edgeHub. This causes the messages which are sent before the route is config to be permanently lost. Background: Historically, the edgeHub tries to download its twin to get the routings before it starts up the protocol heads, then it was waiting till the twin arrived, then it had the routings, then it started the protocol heads. With the new broker and nested scenario, the problem is that it needs the protocol heads up to get the twin. The twin comes through the MQTT broker, and one of the protocol heads handle the messages from the broker. The problem arises because protocol heads are up, but no twin available yet. Fix: If the upstream is not via MQTT broker, then await the twin task; otherwise, let it runs. Todo: This code change does not handle the scenario: IoTHub <--MQTT/BRIDGE-- EdgeDevice <--AMQP-- DeviceA Since we are not waiting on the configUpdate (that would be a dead lock, because configUpdate() needs the new mqtt/broker to be started), the code goes ahead and starts every protocol head i.e. AMQP listener for connecting clients. This can cause an AMQP message to be sent (into the void) before a route is setup on the DeviceA. --- .../DependencyManager.cs | 8 ++--- .../ExperimentalFeatures.cs | 10 +++++++ .../Program.cs | 30 ++++++++++++++----- 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs index 8b1842a02a0..92949014c71 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs @@ -210,11 +210,9 @@ void RegisterRoutingModule( bool checkEntireQueueOnCleanup = this.configuration.GetValue("CheckEntireQueueOnCleanup", false); int messageCleanupIntervalSecs = this.configuration.GetValue("MessageCleanupIntervalSecs", 1800); bool closeCloudConnectionOnDeviceDisconnect = this.configuration.GetValue("CloseCloudConnectionOnDeviceDisconnect", true); - - bool isLegacyUpstream = !experimentalFeatures.Enabled - || !experimentalFeatures.EnableMqttBroker - || !experimentalFeatures.EnableNestedEdge - || !this.GetConfigurationValueIfExists(Constants.ConfigKey.GatewayHostname).HasValue; + bool isLegacyUpstream = ExperimentalFeatures.IsViaBrokerUpstream( + experimentalFeatures, + this.GetConfigurationValueIfExists(Constants.ConfigKey.GatewayHostname).HasValue); builder.RegisterModule( new RoutingModule( diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/ExperimentalFeatures.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/ExperimentalFeatures.cs index 8ef1bd557f8..0c0ddad65ae 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/ExperimentalFeatures.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/ExperimentalFeatures.cs @@ -28,6 +28,16 @@ public static ExperimentalFeatures Create(IConfiguration experimentalFeaturesCon return experimentalFeatures; } + public static bool IsViaBrokerUpstream(ExperimentalFeatures experimentalFeatures, bool hasGatewayHostname) + { + bool isLegacyUpstream = !experimentalFeatures.Enabled + || !experimentalFeatures.EnableMqttBroker + || !experimentalFeatures.EnableNestedEdge + || !hasGatewayHostname; + + return isLegacyUpstream; + } + public bool Enabled { get; } public bool DisableCloudSubscriptions { get; } diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/Program.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/Program.cs index 20b96ec6818..837cf079583 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/Program.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/Program.cs @@ -97,10 +97,20 @@ static async Task MainAsync(IConfigurationRoot configuration) logger.LogInformation("Initializing configuration"); IConfigSource configSource = await container.Resolve>(); ConfigUpdater configUpdater = await container.Resolve>(); + ExperimentalFeatures experimentalFeatures = CreateExperimentalFeatures(configuration); var configUpdaterStartupFailed = new TaskCompletionSource(); - _ = configUpdater.Init(configSource).ContinueWith( - _ => configUpdaterStartupFailed.SetResult(false), - TaskContinuationOptions.OnlyOnFaulted); + var configDownloadTask = configUpdater.Init(configSource); + + _ = configDownloadTask.ContinueWith( + _ => configUpdaterStartupFailed.SetResult(false), + TaskContinuationOptions.OnlyOnFaulted); + + if (!ExperimentalFeatures.IsViaBrokerUpstream( + experimentalFeatures, + string.IsNullOrEmpty(configuration.GetValue(Constants.ConfigKey.GatewayHostname)))) + { + await configDownloadTask; + } if (!Enum.TryParse(configuration.GetValue("AuthenticationMode", string.Empty), true, out AuthenticationMode authenticationMode) || authenticationMode != AuthenticationMode.Cloud) @@ -112,7 +122,7 @@ static async Task MainAsync(IConfigurationRoot configuration) TimeSpan shutdownWaitPeriod = TimeSpan.FromSeconds(configuration.GetValue("ShutdownWaitPeriod", DefaultShutdownWaitPeriod)); (CancellationTokenSource cts, ManualResetEventSlim completed, Option handler) = ShutdownHandler.Init(shutdownWaitPeriod, logger); - using (IProtocolHead protocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, container, hosting)) + using (IProtocolHead protocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, experimentalFeatures, container, hosting)) using (var renewal = new CertificateRenewal(certificates, logger)) { try @@ -138,6 +148,13 @@ static async Task MainAsync(IConfigurationRoot configuration) return 0; } + static ExperimentalFeatures CreateExperimentalFeatures(IConfigurationRoot configuration) + { + IConfiguration experimentalFeaturesConfig = configuration.GetSection(Constants.ConfigKey.ExperimentalFeatures); + ExperimentalFeatures experimentalFeatures = ExperimentalFeatures.Create(experimentalFeaturesConfig, Logger.Factory.CreateLogger("EdgeHub")); + return experimentalFeatures; + } + static void LogVersionInfo(ILogger logger) { VersionInfo versionInfo = VersionInfo.Get(Constants.VersionInfoFileName); @@ -147,11 +164,8 @@ static void LogVersionInfo(ILogger logger) } } - static async Task GetEdgeHubProtocolHeadAsync(ILogger logger, IConfigurationRoot configuration, IContainer container, Hosting hosting) + static async Task GetEdgeHubProtocolHeadAsync(ILogger logger, IConfigurationRoot configuration, ExperimentalFeatures experimentalFeatures, IContainer container, Hosting hosting) { - IConfiguration experimentalFeaturesConfig = configuration.GetSection(Constants.ConfigKey.ExperimentalFeatures); - ExperimentalFeatures experimentalFeatures = ExperimentalFeatures.Create(experimentalFeaturesConfig, Logger.Factory.CreateLogger("EdgeHub")); - var protocolHeads = new List(); // MQTT broker overrides the legacy MQTT protocol head