Skip to content

Commit

Permalink
EdgeHub awaits for Twin in non-MQTT broker scenario (Azure#4084) (Azu…
Browse files Browse the repository at this point in the history
…re#4129)

TL;DR (cherry-picked: Azure@40acc96)
Currently the protocol head could start up before the route configuration is setup on edgeHub. This causes the messages which are sent before the route is config to be permanently lost.

Background:
Historically, the edgeHub tries to download its twin to get the routings before it starts up the protocol heads, then it was waiting till the twin arrived, then it had the routings, then it started the protocol heads. With the new broker and nested scenario, the problem is that it needs the protocol heads up to get the twin. The twin comes through the MQTT broker, and one of the protocol heads handle the messages from the broker. The problem arises because protocol heads are up, but no twin available yet.

Fix:
If the upstream is not via MQTT broker, then await the twin task; otherwise, let it runs.

Todo:
This code change does not handle the scenario:
IoTHub <--MQTT/BRIDGE-- EdgeDevice <--AMQP-- DeviceA

Since we are not waiting on the configUpdate (that would be a dead lock, because configUpdate() needs the new mqtt/broker to be started), the code goes ahead and starts every protocol head i.e. AMQP listener for connecting clients. This can cause an AMQP message to be sent (into the void) before a route is setup on the DeviceA.
  • Loading branch information
yophilav authored Dec 18, 2020
1 parent 18fb69f commit ee0b87f
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,9 @@ void RegisterRoutingModule(
bool checkEntireQueueOnCleanup = this.configuration.GetValue("CheckEntireQueueOnCleanup", false);
int messageCleanupIntervalSecs = this.configuration.GetValue("MessageCleanupIntervalSecs", 1800);
bool closeCloudConnectionOnDeviceDisconnect = this.configuration.GetValue("CloseCloudConnectionOnDeviceDisconnect", true);

bool isLegacyUpstream = !experimentalFeatures.Enabled
|| !experimentalFeatures.EnableMqttBroker
|| !experimentalFeatures.EnableNestedEdge
|| !this.GetConfigurationValueIfExists<string>(Constants.ConfigKey.GatewayHostname).HasValue;
bool isLegacyUpstream = ExperimentalFeatures.IsViaBrokerUpstream(
experimentalFeatures,
this.GetConfigurationValueIfExists<string>(Constants.ConfigKey.GatewayHostname).HasValue);

builder.RegisterModule(
new RoutingModule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ public static ExperimentalFeatures Create(IConfiguration experimentalFeaturesCon
return experimentalFeatures;
}

public static bool IsViaBrokerUpstream(ExperimentalFeatures experimentalFeatures, bool hasGatewayHostname)
{
bool isLegacyUpstream = !experimentalFeatures.Enabled
|| !experimentalFeatures.EnableMqttBroker
|| !experimentalFeatures.EnableNestedEdge
|| !hasGatewayHostname;

return isLegacyUpstream;
}

public bool Enabled { get; }

public bool DisableCloudSubscriptions { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,20 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)
logger.LogInformation("Initializing configuration");
IConfigSource configSource = await container.Resolve<Task<IConfigSource>>();
ConfigUpdater configUpdater = await container.Resolve<Task<ConfigUpdater>>();
ExperimentalFeatures experimentalFeatures = CreateExperimentalFeatures(configuration);
var configUpdaterStartupFailed = new TaskCompletionSource<bool>();
_ = configUpdater.Init(configSource).ContinueWith(
_ => configUpdaterStartupFailed.SetResult(false),
TaskContinuationOptions.OnlyOnFaulted);
var configDownloadTask = configUpdater.Init(configSource);

_ = configDownloadTask.ContinueWith(
_ => configUpdaterStartupFailed.SetResult(false),
TaskContinuationOptions.OnlyOnFaulted);

if (!ExperimentalFeatures.IsViaBrokerUpstream(
experimentalFeatures,
string.IsNullOrEmpty(configuration.GetValue<string>(Constants.ConfigKey.GatewayHostname))))
{
await configDownloadTask;
}

if (!Enum.TryParse(configuration.GetValue("AuthenticationMode", string.Empty), true, out AuthenticationMode authenticationMode)
|| authenticationMode != AuthenticationMode.Cloud)
Expand All @@ -112,7 +122,7 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)
TimeSpan shutdownWaitPeriod = TimeSpan.FromSeconds(configuration.GetValue("ShutdownWaitPeriod", DefaultShutdownWaitPeriod));
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(shutdownWaitPeriod, logger);

using (IProtocolHead protocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, container, hosting))
using (IProtocolHead protocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, experimentalFeatures, container, hosting))
using (var renewal = new CertificateRenewal(certificates, logger))
{
try
Expand All @@ -138,6 +148,13 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)
return 0;
}

static ExperimentalFeatures CreateExperimentalFeatures(IConfigurationRoot configuration)
{
IConfiguration experimentalFeaturesConfig = configuration.GetSection(Constants.ConfigKey.ExperimentalFeatures);
ExperimentalFeatures experimentalFeatures = ExperimentalFeatures.Create(experimentalFeaturesConfig, Logger.Factory.CreateLogger("EdgeHub"));
return experimentalFeatures;
}

static void LogVersionInfo(ILogger logger)
{
VersionInfo versionInfo = VersionInfo.Get(Constants.VersionInfoFileName);
Expand All @@ -147,11 +164,8 @@ static void LogVersionInfo(ILogger logger)
}
}

static async Task<EdgeHubProtocolHead> GetEdgeHubProtocolHeadAsync(ILogger logger, IConfigurationRoot configuration, IContainer container, Hosting hosting)
static async Task<EdgeHubProtocolHead> GetEdgeHubProtocolHeadAsync(ILogger logger, IConfigurationRoot configuration, ExperimentalFeatures experimentalFeatures, IContainer container, Hosting hosting)
{
IConfiguration experimentalFeaturesConfig = configuration.GetSection(Constants.ConfigKey.ExperimentalFeatures);
ExperimentalFeatures experimentalFeatures = ExperimentalFeatures.Create(experimentalFeaturesConfig, Logger.Factory.CreateLogger("EdgeHub"));

var protocolHeads = new List<IProtocolHead>();

// MQTT broker overrides the legacy MQTT protocol head
Expand Down

0 comments on commit ee0b87f

Please sign in to comment.