Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into vipeller/cert_imp…
Browse files Browse the repository at this point in the history
…ort_log

# Conflicts:
#	edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/Program.cs
  • Loading branch information
vipeller committed Feb 8, 2022
2 parents cd25014 + 5dd965f commit 3251412
Show file tree
Hide file tree
Showing 14 changed files with 253 additions and 35 deletions.
8 changes: 1 addition & 7 deletions builds/misc/templates/build-rocksdb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@ stages:
arch: arm64v8
postfix: arm64
steps:
- bash: |
build_image=rocksdb-build:master-$(postfix)-$(Build.BuildNumber) && \
mkdir -p $(Build.ArtifactStagingDirectory)/librocksdb && \
cd $(System.DefaultWorkingDirectory)/edge-util/docker/linux/$(arch) && \
docker build --tag ${build_image} . && \
docker run --rm -v $(Build.ArtifactStagingDirectory)/librocksdb:/artifacts \
${build_image} cp /publish/librocksdb.so.$(postfix) /artifacts
- script: scripts/linux/buildRocksDb.sh --output-dir $(Build.ArtifactStagingDirectory) --postfix $(postfix) --build-number $(Build.BuildNumber) --arch $(arch)
displayName: Build and copy out rocksdb lib
- task: PublishBuildArtifacts@1
displayName: 'Publish Artifacts to VSTS'
Expand Down
8 changes: 2 additions & 6 deletions doc/devguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,9 @@ reportgenerator "-reports:TestResults\*\*.coveragexml" "-targetdir:report"

## Build Edge Hub Container Locally

Sometimes it is useful to build the Edge Hub container locally. If you want to do so you can run the below set of scripts:
Sometimes it is useful to build the Edge Hub container locally. If you want to do so you can run the below script:
```
scripts/linux/buildBranch.sh
scripts/linux/cross-platform-rust-build.sh --os alpine --arch amd64 --build-path mqtt/mqttd
scripts/linux/cross-platform-rust-build.sh --os alpine --arch amd64 --build-path edge-hub/watchdog
scripts/linux/consolidate-build-artifacts.sh --artifact-name "edge-hub"
scripts/linux/buildImage.sh -r "$(registry.address)" -u "$(registry.user)" -p "$(registry.password)" -i "${{ parameters.imageName }}" -n "${{ parameters.namespace }}" -P "${{ parameters.project }}" -v "${{ parameters.version }} --bin-dir target"
./scripts/linux/buildLocalEdgeHub.sh --registry-address "$(registry.address)" --version "$(version)"
```

## Build Manifest Image
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public async Task CloseAsync(CancellationToken token)

public void Dispose()
{
this.CloseAsync(CancellationToken.None).Wait();
}

void OnAcceptTransport(TransportListener transportListener, TransportAsyncCallbackArgs args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class MqttProtocolHead : IProtocolHead

IChannel serverChannel;
IEventLoopGroup eventLoopGroup;
IEventLoopGroup wsEventLoopGroup;
IEventLoopGroup parentEventLoopGroup;

public MqttProtocolHead(
ISettingsProvider settingsProvider,
Expand Down Expand Up @@ -106,13 +108,15 @@ public async Task CloseAsync(CancellationToken token)
{
try
{
this.logger.LogInformation("Stopping");
this.logger.LogInformation("Stopping MQTT protocol head");

await (this.serverChannel?.CloseAsync() ?? TaskEx.Done);
await (this.eventLoopGroup?.ShutdownGracefullyAsync() ?? TaskEx.Done);
await (this.parentEventLoopGroup?.ShutdownGracefullyAsync() ?? TaskEx.Done);
await (this.wsEventLoopGroup?.ShutdownGracefullyAsync() ?? TaskEx.Done);
// TODO: gracefully shutdown the MultithreadEventLoopGroup in MqttWebSocketListener?
// TODO: this.webSocketListenerRegistry.TryUnregister("mqtts")?
this.logger.LogInformation("Stopped");
this.logger.LogInformation("Stopped MQTT protocol head");
}
catch (Exception ex)
{
Expand All @@ -123,7 +127,6 @@ public async Task CloseAsync(CancellationToken token)
public void Dispose()
{
this.mqttConnectionProvider.Dispose();
this.CloseAsync(CancellationToken.None).Wait();
}

ServerBootstrap SetupServerBootstrap()
Expand All @@ -138,11 +141,10 @@ ServerBootstrap SetupServerBootstrap()

var bootstrap = new ServerBootstrap();
// multithreaded event loop that handles the incoming connection
IEventLoopGroup parentEventLoopGroup = new MultithreadEventLoopGroup(parentEventLoopCount);
this.parentEventLoopGroup = new MultithreadEventLoopGroup(parentEventLoopCount);
// multithreaded event loop (worker) that handles the traffic of the accepted connections
this.eventLoopGroup = new MultithreadEventLoopGroup(threadCount);

bootstrap.Group(parentEventLoopGroup, this.eventLoopGroup)
bootstrap.Group(this.parentEventLoopGroup, this.eventLoopGroup)
.Option(ChannelOption.SoBacklog, listenBacklogSize)
// Allow listening socket to force bind to port if previous socket is still in TIME_WAIT
// Fixes "address is already in use" errors
Expand Down Expand Up @@ -185,14 +187,15 @@ ServerBootstrap SetupServerBootstrap()
bridgeFactory));
}));

this.wsEventLoopGroup = new MultithreadEventLoopGroup(Environment.ProcessorCount);
var mqttWebSocketListener = new MqttWebSocketListener(
settings,
bridgeFactory,
this.authenticator,
this.usernameParser,
this.clientCredentialsFactory,
() => this.sessionProvider,
new MultithreadEventLoopGroup(Environment.ProcessorCount),
this.wsEventLoopGroup,
this.byteBufferAllocator,
AutoRead,
maxInboundMessageSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ await hostToStop.Match(

public void Dispose()
{
this.DisposeAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public async Task CloseAsync(CancellationToken token)
Events.Closed();
}

public void Dispose() => this.CloseAsync(CancellationToken.None).Wait();
public void Dispose()
{
}

static class Events
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service

public class CertificateRenewal : IDisposable
{
static readonly TimeSpan MaxRenewAfter = TimeSpan.FromMilliseconds(int.MaxValue);
static readonly TimeSpan DefaultMaxRenewAfter = TimeSpan.FromMilliseconds(int.MaxValue);
static readonly TimeSpan TimeBuffer = TimeSpan.FromMinutes(5);

readonly TimeSpan maxRenewAfter;
readonly EdgeHubCertificates certificates;
readonly ILogger logger;
readonly Timer timer;
readonly CancellationTokenSource cts;

public CertificateRenewal(EdgeHubCertificates certificates, ILogger logger)
public CertificateRenewal(EdgeHubCertificates certificates, ILogger logger, TimeSpan maxRenewAfter)
{
this.certificates = Preconditions.CheckNotNull(certificates, nameof(certificates));
this.logger = Preconditions.CheckNotNull(logger, nameof(logger));
this.cts = new CancellationTokenSource();
this.maxRenewAfter = maxRenewAfter;

TimeSpan timeToExpire = certificates.ServerCertificate.NotAfter - DateTime.UtcNow;
if (timeToExpire > TimeBuffer)
Expand All @@ -29,8 +31,8 @@ public CertificateRenewal(EdgeHubCertificates certificates, ILogger logger)
// This is the maximum value for the timer (~24 days)
// Math.Min unfortunately doesn't work with TimeSpans so we need to do the check manually
TimeSpan renewAfter = timeToExpire - (TimeBuffer / 2);
TimeSpan clamped = renewAfter > MaxRenewAfter
? MaxRenewAfter
TimeSpan clamped = renewAfter > this.maxRenewAfter
? this.maxRenewAfter
: renewAfter;
logger.LogInformation("Scheduling server certificate renewal for {0}.", DateTime.UtcNow.Add(renewAfter).ToString("o"));
logger.LogDebug("Scheduling server certificate renewal timer for {0} (clamped to Int32.MaxValue).", DateTime.UtcNow.Add(clamped).ToString("o"));
Expand Down Expand Up @@ -73,7 +75,7 @@ protected virtual void Dispose(bool disposing)
void Callback(object _state)
{
TimeSpan timeToExpire = this.certificates.ServerCertificate.NotAfter - DateTime.UtcNow;
if (timeToExpire > TimeBuffer)
if (timeToExpire > TimeBuffer && this.maxRenewAfter == DefaultMaxRenewAfter)
{
// Timer has expired but is not within the time window for renewal
// Reschedule the timer.
Expand All @@ -82,8 +84,8 @@ void Callback(object _state)
// This is the maximum value for the timer (~24 days)
// Math.Min unfortunately doesn't work with TimeSpans so we need to do the check manually
TimeSpan renewAfter = timeToExpire - (TimeBuffer / 2);
TimeSpan clamped = renewAfter > MaxRenewAfter
? MaxRenewAfter
TimeSpan clamped = renewAfter > this.maxRenewAfter
? this.maxRenewAfter
: renewAfter;
this.logger.LogDebug("Scheduling server certificate renewal timer for {0}.", DateTime.UtcNow.Add(clamped).ToString("o"));
this.timer.Change(clamped, Timeout.InfiniteTimeSpan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ static async Task<int> MainAsync(IConfigurationRoot configuration)
TimeSpan shutdownWaitPeriod = TimeSpan.FromSeconds(configuration.GetValue("ShutdownWaitPeriod", DefaultShutdownWaitPeriod));
(CancellationTokenSource cts, ManualResetEventSlim completed, Option<object> handler) = ShutdownHandler.Init(shutdownWaitPeriod, logger);

double renewAfter = configuration.GetValue("ServerCertificateRenewAfterInMs", int.MaxValue);
renewAfter = renewAfter > int.MaxValue ? int.MaxValue : renewAfter;
TimeSpan maxRenewAfter = TimeSpan.FromMilliseconds(renewAfter);
using (IProtocolHead mqttBrokerProtocolHead = await GetMqttBrokerProtocolHeadAsync(experimentalFeatures, container))
using (IProtocolHead edgeHubProtocolHead = await GetEdgeHubProtocolHeadAsync(logger, configuration, experimentalFeatures, container, hosting))
using (var renewal = new CertificateRenewal(certificates, logger))
using (var renewal = new CertificateRenewal(certificates, logger, maxRenewAfter))
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public async Task StartsUpAndServes()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -71,6 +73,7 @@ public async Task CannotStartTwice()
{
await sut.StartAsync();
await Assert.ThrowsAsync<InvalidOperationException>(async () => await sut.StartAsync());
await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -90,6 +93,7 @@ public async Task DeniesNoPasswordNorCertificate()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);
await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -112,6 +116,8 @@ public async Task DeniesBothPasswordAndCertificate()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -132,6 +138,8 @@ public async Task DeniesBadCertificateFormat()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -152,6 +160,8 @@ public async Task DeniesNoVersion()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -173,6 +183,8 @@ public async Task DeniesBadVersion()
dynamic response = await PostAsync(content, this.url);

Assert.Equal(403, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -199,6 +211,8 @@ public async Task AcceptsGoodTokenDeniesBadToken()

response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -224,6 +238,8 @@ public async Task AcceptsGoodThumbprintDeniesBadThumbprint()

response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand Down Expand Up @@ -254,6 +270,8 @@ public async Task AcceptsGoodCaDeniesBadCa()

response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -275,6 +293,8 @@ public async Task ReturnsDeviceIdentity()
var response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);
Assert.Equal("testhub/device", (string)response.identity);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -296,6 +316,8 @@ public async Task ReturnsModuleIdentity()
var response = await PostAsync(content, this.url);
Assert.Equal(200, (int)response.result);
Assert.Equal("testhub/device/module", (string)response.identity);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -310,6 +332,8 @@ public async Task AcceptsRequestWithContentLength()
var result = await SendDirectRequest(RequestBody);

Assert.StartsWith(@"{""result"":200,", result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -324,6 +348,8 @@ public async Task AcceptsRequestWithNoContentLength()
var result = await SendDirectRequest(RequestBody, withContentLength: false);

Assert.StartsWith(@"{""result"":200,", result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand All @@ -338,6 +364,8 @@ public async Task DeniesMalformedJsonRequest()
var result = await SendDirectRequest(NonJSONRequestBody);

Assert.StartsWith(@"{""result"":403,", result);

await sut.CloseAsync(CancellationToken.None);
}
}

Expand Down Expand Up @@ -379,6 +407,8 @@ public async Task StoresMetadataCorrectly()
var modelId = (await metadataStore.GetMetadata("device")).ModelId;
Assert.True(modelId.HasValue);
Assert.Equal(modelIdString, modelId.GetOrElse("impossibleValue"));

await sut.CloseAsync(CancellationToken.None);
}
}

Expand Down
12 changes: 10 additions & 2 deletions mqtt/mqttd/src/app/edgehub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,16 @@ impl Bootstrap for EdgeHubBootstrap {

fs::create_dir_all(state_dir.clone())?;
let mut persistor = FilePersistor::new(state_dir, VersionedFileFormat::default());
let state = persistor.load().await?;
info!("state loaded.");
let state = match persistor.load().await {
Ok(state) => {
info!("state loaded.");
state
}
Err(e) => {
error!("failed to load broker state, most likely the broker was forcefully shut down and state file is corrupted: {}", e);
None
}
};

let device_id = env::var(DEVICE_ID_ENV).context(DEVICE_ID_ENV)?;
let iothub_id = env::var(IOTHUB_HOSTNAME_ENV).context(IOTHUB_HOSTNAME_ENV)?;
Expand Down
12 changes: 10 additions & 2 deletions mqtt/mqttd/src/app/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ impl Bootstrap for GenericBootstrap {

fs::create_dir_all(state_dir.clone())?;
let mut persistor = FilePersistor::new(state_dir, VersionedFileFormat::default());
let state = persistor.load().await?;
info!("state loaded.");
let state = match persistor.load().await {
Ok(state) => {
info!("state loaded.");
state
}
Err(e) => {
error!("failed to load broker state, most likely the broker was forcefully shut down and state file is corrupted: {}", e);
None
}
};

let broker = BrokerBuilder::default()
.with_authorizer(AllowAll)
Expand Down
Loading

0 comments on commit 3251412

Please sign in to comment.