Skip to content

Commit

Permalink
Update CloudNative.CloudEvents.Mqtt to MQTTnet version 4.3.6.1152
Browse files Browse the repository at this point in the history
This involves a new major version of CloudNative.CloudEvents.Mqtt.

Signed-off-by: Jon Skeet <jonskeet@google.com>
  • Loading branch information
jskeet committed Aug 1, 2024
1 parent c18d2ca commit 3872795
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
<Description>MQTT extensions for CloudNative.CloudEvents</Description>
<PackageTags>cncf;cloudnative;cloudevents;events;mqtt</PackageTags>
<LangVersion>8.0</LangVersion>
<Version>3.$(MinorVersion).$(PatchVersion)</Version>
<!-- After the first release of v3, we'll change the major here to 3. -->
<PackageValidationBaselineVersion>2.$(PackageValidationMinor).0</PackageValidationBaselineVersion>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="3.0.15" />
<PackageReference Include="MQTTnet" Version="4.3.6.1152" />
<ProjectReference Include="..\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>

Expand Down
10 changes: 5 additions & 5 deletions src/CloudNative.CloudEvents.Mqtt/MqttExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

Expand Down Expand Up @@ -39,10 +39,10 @@ public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
Validation.CheckNotNull(message, nameof(message));

// TODO: Determine if there's a sensible content type we should apply.
return formatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes);
return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes);
}

// TODO: Update to a newer version of MQTTNet and support both binary and structured mode?
// TODO: Support both binary and structured mode.
/// <summary>
/// Converts a CloudEvent to <see cref="MqttApplicationMessage"/>.
/// </summary>
Expand All @@ -61,11 +61,11 @@ public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cl
return new MqttApplicationMessage
{
Topic = topic,
Payload = BinaryDataUtilities.AsArray(formatter.EncodeStructuredModeMessage(cloudEvent, out _))
PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out _))
};
default:
throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
}
}
}
}
}
11 changes: 10 additions & 1 deletion src/CloudNative.CloudEvents/Core/BinaryDataUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,16 @@ public static byte[] AsArray(ReadOnlyMemory<byte> memory)
}

// Note: when this returns, the Array property of the returned segment is guaranteed not to be null.
private static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory) =>

/// <summary>
/// Returns the data from <paramref name="memory"/> as a byte array, return the underlying array
/// if there is one, or creating a copy otherwise. This method should be used with care, due to the
/// "sometimes shared, sometimes not" nature of the result. (It is generally safe to use this with the result
/// of encoding a CloudEvent, assuming the same memory is not used elsewhere.)
/// </summary>
/// <param name="memory">The memory to obtain the data from.</param>
/// <returns>The data in <paramref name="memory"/> as an array segment.</returns>
public static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory) =>
MemoryMarshal.TryGetArray(memory, out var segment) && segment.Array is not null
? segment
: new ArraySegment<byte>(memory.ToArray());
Expand Down
22 changes: 12 additions & 10 deletions test/CloudNative.CloudEvents.UnitTests/Mqtt/MqttTest.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

using CloudNative.CloudEvents.NewtonsoftJson;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using System;
using System.Net.Mime;
Expand All @@ -18,16 +16,17 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests
{
public class MqttTest : IDisposable
{
private readonly IMqttServer mqttServer;
private readonly MqttServer mqttServer;

public MqttTest()
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithDefaultEndpoint()
.WithDefaultEndpointPort(52355);

this.mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.StartAsync(optionsBuilder.Build()).GetAwaiter().GetResult();
this.mqttServer = new MqttFactory().CreateMqttServer(optionsBuilder.Build());
mqttServer.StartAsync().GetAwaiter().GetResult();
}

public void Dispose()
Expand Down Expand Up @@ -55,14 +54,17 @@ public async Task MqttSendTest()

var options = new MqttClientOptionsBuilder()
.WithClientId("Client1")
.WithTcpServer("localhost", 52355)
.WithTcpServer("127.0.0.1", 52355)
.WithCleanSession()
.Build();

TaskCompletionSource<CloudEvent> tcs = new TaskCompletionSource<CloudEvent>();
await client.ConnectAsync(options);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(
args => tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter)));
client.ApplicationMessageReceivedAsync += args =>
{
tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter));
return Task.CompletedTask;
};

var result = await client.SubscribeAsync("abc");
await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Structured, new JsonEventFormatter(), topic: "abc"));
Expand All @@ -79,4 +81,4 @@ public async Task MqttSendTest()
Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
}
}
}
}

0 comments on commit 3872795

Please sign in to comment.