-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Server
These samples are only valid for version 3. Please check the Samples directory for samples and documentation for version 4.0+
Creating a MQTT server is similar to creating a MQTT client. The following code shows the most simple way of creating a new MQTT server with a TCP endpoint which is listening at the default port 1883.
// Start a MQTT server.
var mqttServer = new MqttFactory().CreateMqttServer();
await mqttServer.StartAsync(new MqttServerOptions());
Console.WriteLine("Press any key to exit.");
Console.ReadLine();
await mqttServer.StopAsync();
Setting several options for the MQTT server is possible by setting the property values of the MqttServerOptions directly or via using the MqttServerOptionsBuilder (which is recommended). The following code shows how to use the MqttServerOptionsBuilder.
// Configure MQTT server.
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithDefaultEndpointPort(1884);
var mqttServer = new MqttFactory().CreateMqttServer();
await mqttServer.StartAsync(optionsBuilder.Build());
The following code shows how to validate an incoming MQTT client connection request:
// Setup client validator.
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionValidator(c =>
{
if (c.ClientId.Length < 10)
{
c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
return;
}
if (c.Username != "mySecretUser")
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
if (c.Password != "mySecretPassword")
{
c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return;
}
c.ReasonCode = MqttConnectReasonCode.Success;
});
In order to use an encrypted connection a certificate including the private key is required. The following code shows how to start a server using a certificate for encryption:
using System.Reflection;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
...
var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"),"yourPassword", X509KeyStorageFlags.Exportable);
var optionsBuilder = new MqttServerOptionsBuilder()
.WithoutDefaultEndpoint() // This call disables the default unencrypted endpoint on port 1883
.WithEncryptedEndpoint()
.WithEncryptedEndpointPort(config.Port)
.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
.WithEncryptionSslProtocol(SslProtocols.Tls12)
But also other overloads getting a valid certificate blob (byte array) can be used.
For creating a self-signed certificate for testing the following command can be used (Windows SDK must be installed):
makecert.exe -sky exchange -r -n "CN=selfsigned.crt" -pe -a sha1 -len 2048 -ss My "test.cer"
OpenSSL can also be used to create a self-signed PFX certificate as described here.
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365
openssl pkcs12 -export -out certificate.pfx -inkey key.pem -in cert.pem
var caFile = new FileInfo("root.crt");
var ca = new X509Certificate2(caFile.FullName);
options.TlsEndpointOptions.RemoteCertificateValidationCallback += (sender, cer, chain, sslPolicyErrors) =>
{
try
{
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}
if (sslPolicyErrors == SslPolicyErrors.RemoteCertificateChainErrors)
{
chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
chain.ChainPolicy.VerificationFlags = X509VerificationFlags.NoFlag;
chain.ChainPolicy.ExtraStore.Add(ca);
chain.Build((X509Certificate2)cer);
return chain.ChainElements.Cast<X509ChainElement>().Any(a => a.Certificate.Thumbprint == ca.Thumbprint);
}
}
catch { }
return false;
};
The server is also able to publish MQTT application messages. The object is the same as for the client implementation. Due to the fact that the server is able to publish its own messages it is not required having a loopback connection in the same process.
This allows also running the server in a Windows IoT Core UWP app. This platform has a network isolation which makes it impossible to communicate via localhost etc.
Examples for publishing a message are described at the client section of this Wiki.
The server is also able to process every application message which was published by any client. The event ApplicationMessageReceived will be fired for every processed message. It has the same format as for the client but additionally has the ClientId.
Details for consuming a application messages are described at the client section of this Wiki.
The server supports retained MQTT messages. Those messages are kept and send to clients when they connect and subscribe to them. It is also supported to save all retained messages and loading them after the server has started. This required implementing an interface. The following code shows how to serialize retained messages as JSON:
// Setting the options
options.Storage = new RetainedMessageHandler();
// The implementation of the storage:
// This code uses the JSON library "Newtonsoft.Json".
public class RetainedMessageHandler : IMqttServerStorage
{
private const string Filename = "C:\\MQTT\\RetainedMessages.json";
public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
{
File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
return Task.FromResult(0);
}
public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
{
IList<MqttApplicationMessage> retainedMessages;
if (File.Exists(Filename))
{
var json = File.ReadAllText(Filename);
retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
}
else
{
retainedMessages = new List<MqttApplicationMessage>();
}
return Task.FromResult(retainedMessages);
}
}
A custom interceptor can be set at the server options. This interceptor is called for every application message which is received by the server. This allows extending application messages before they are persisted (in case of a retained message) and before being dispatched to subscribers. This allows use cases like adding a time stamp to every application message if the hardware device does not know the time or time zone etc. The following code shows how to use the interceptor:
var optionsBuilder = new MqttServerOptionsBuilder()
.WithApplicationMessageInterceptor(context =>
{
if (context.ApplicationMessage.Topic == "my/custom/topic")
{
context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
}
// It is possible to disallow the sending of messages for a certain client id like this:
if (context.ClientId != "Someone")
{
context.AcceptPublish = false;
return;
}
// It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
// This is useful when the IoT device has no own clock and the creation time of the message might be important.
})
.Build();
If you want to stop processing an application message completely (like a delete) then the property context.ApplicationMessage.Payload must be set to null.
A custom interceptor can be set to control which topics can be subscribed by a MQTT client. This allows moving private API-Topics to a protected area which is only available for certain clients. The following code shows how to use the subscription interceptor.
// Protect several topics from being subscribed from every client.
var optionsBuilder = new MqttServerOptionsBuilder()
.WithSubscriptionInterceptor(context =>
{
if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
{
context.AcceptSubscription = false;
}
if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
{
context.AcceptSubscription = false;
context.CloseConnection = true;
}
})
.Build();
It is also supported to use an async method instead of a synchronized one like in the above example.
From version 3.0.6 and up, there is a Dictionary<object, object>
called SessionItems
. It allows to store custom data in the session and is available in all interceptors:
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionValidator(c => { c.SessionItems.Add("SomeData", true); }
.WithSubscriptionInterceptor(c => { c.SessionItems.Add("YourData", new List<string>{"a", "b"}); }
.WithApplicationMessageInterceptor(c => { c.SessionItems.Add("Test", 123); }
This library also has support for a WebSocket based server which is integrated into ASP.NET Core 2.0. This functionality requires an additional library called MQTTnet.AspNetCore. After adding this library a MQTT server can be added to a Kestrel HTTP server.
// In class _Startup_ of the ASP.NET Core 2.0 project.
public void ConfigureServices(IServiceCollection services)
{
// This adds a hosted mqtt server to the services
services.AddHostedMqttServer(builder => builder.WithDefaultEndpointPort(1883));
// This adds TCP server support based on System.Net.Socket
services.AddMqttTcpServerAdapter();
// This adds websocket support
services.AddMqttWebSocketServerAdapter();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
// This maps the websocket to an MQTT endpoint
app.UseMqttEndpoint();
// Other stuff
}
MQTTnet.AspNetCore is compatible with the abstractions present in ASP.NET Core 2.0 but it also offers a new TCP transport based on ASP.NET Core 2.1 Microsoft.AspNetCore.Connections.Abstractions. This transport is mutual exclusive with the old TCP transport so you may only add and use one of them. Our benchmark indicates that the new transport is up to 30 times faster.
// In class _Program_ of the ASP.NET Core 2.1 or 2.2 project.
private static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseKestrel(o => {
o.ListenAnyIP(1883, l => l.UseMqtt()); // MQTT pipeline
o.ListenAnyIP(5000); // Default HTTP pipeline
})
.UseStartup<Startup>()
.Build();
// In class _Startup_ of the ASP.NET Core 2.1 or 2.2 project.
public void ConfigureServices(IServiceCollection services)
{
//this adds a hosted mqtt server to the services
services.AddHostedMqttServer(builder => builder.WithDefaultEndpointPort(1883));
//this adds tcp server support based on Microsoft.AspNetCore.Connections.Abstractions
services.AddMqttConnectionHandler();
//this adds websocket support
services.AddMqttWebSocketServerAdapter();
}
In ASP.NET Core 3.1+, the server can be configured like this. Remember, that the TLS middleware connection is not yet available, so this will only work for WebSocket connections (Check https://github.com/chkr1011/MQTTnet/issues/464).
// In class _Program_ of the ASP.NET Core 3.1+ project.
private static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseKestrel(o => {
o.ListenAnyIP(1883, l => l.UseMqtt()); // MQTT pipeline
o.ListenAnyIP(5000); // Default HTTP pipeline
})
.UseStartup<Startup>()
.Build();
// In class _Startup_ of the ASP.NET Core 3.1+ project.
public void ConfigureServices(IServiceCollection services)
{
services
.AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
.AddMqttConnectionHandler()
.AddConnections();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseEndpoints(endpoints =>
{
endpoints.MapMqtt("/mqtt");
});
app.UseMqttServer(server =>
{
// Todo: Do something with the server
});
}
In class Program of the ASP.NET 5.0 project:
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
using MQTTnet.AspNetCore.Extensions;
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseKestrel(
o =>
{
o.ListenAnyIP(1883, l => l.UseMqtt()); // MQTT pipeline
o.ListenAnyIP(5000); // Default HTTP pipeline
});
webBuilder.UseStartup<Startup>();
});
}
In class Startup of the ASP.NET 5.0 project:
using System.Linq;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.AspNetCore;
using MQTTnet.AspNetCore.Extensions;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
.AddMqttConnectionHandler()
.AddConnections();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapConnectionHandler<MqttConnectionHandler>(
"/mqtt",
httpConnectionDispatcherOptions => httpConnectionDispatcherOptions.WebSockets.SubProtocolSelector =
protocolList =>
protocolList.FirstOrDefault() ?? string.Empty);
});
app.UseMqttServer(server =>
{
// Todo: Do something with the server
});
}
}
In Windows IoT Core as well as in UWP, loopback connections (127.0.0.1) are not allowed. If you try to connect to a locally running server (broker), this will fail. See Communicating with localhost (loopback) for enable loopback in Windows 10 IoT Core and UWP-apps.
Under Android, there is an issue with the default bound IP address. So you have to use the actual address of the device. Check the example below.
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[0];
var server = new MqttFactory().CreateMqttServer();
server.StartAsync(new MqttServerOptionsBuilder()
.WithDefaultEndpointBoundIPAddress(ipAddress)
.WithDefaultEndpointBoundIPV6Address(IPAddress.None)
.Build()).GetAwaiter().GetResult();
If we have an ASP.NET Core application that needs to send MQTT messages from an MVC controller, the MqttService
singleton needs to be registered with dependency injection. The trick is to have two methods to correctly setup the MQTT part:
-
Have
MqttService
implement all the interfaces needed to hook with MqttServer (likeIMqttServerClientConnectedHandler
,IMqttServerApplicationMessageInterceptor
, etc.) -
Write a
ConfigureMqttServerOptions(AspNetMqttServerOptionsBuilder options)
method that sets up the current object as callback for the needed methods:
public void ConfigureMqttServerOptions(AspNetMqttServerOptionsBuilder options)
{
options.WithConnectionValidator(this);
options.WithApplicationMessageInterceptor(this);
}
- Write a
ConfigureMqttServer(IMqttServer mqtt)
that stores the reference to the MQTT server for later use and setup the handlers:
public void ConfigureMqttServer(IMqttServer mqtt)
{
this.mqtt = mqtt;
mqtt.ClientConnectedHandler = this;
mqtt.ClientDisconnectedHandler = this;
}
Then, in your Startup
class configure and use the service.
In ConfigureServices
:
services.AddSingleton<MqttService>();
services.AddHostedMqttServerWithServices(options => {
var s = options.ServiceProvider.GetRequiredService<MqttService>();
s.ConfigureMqttServerOptions(options);
});
services.AddMqttConnectionHandler();
services.AddMqttWebSocketServerAdapter();
In Configure
:
app.UseMqttEndpoint();
app.UseMqttServer(server => app.ApplicationServices.GetRequiredService<MqttService>().ConfigureMqttServer(server));
Add the AttributeRouting package to your project from https://www.nuget.org/packages/MQTTnet.AspNetCore.AttributeRouting
Modify your Startup.cs
class with the following:
public void ConfigureServices(IServiceCollection services)
{
// Identify and build routes for the current assembly
services.AddMqttControllers();
services
.AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
.AddHostedMqttServerWithServices(mqttServer =>
{
// Optionally set server options here
mqttServer.WithoutDefaultEndpoint();
// Enable Attribute routing
mqttServer.WithAttributeRouting();
})
.AddMqttConnectionHandler()
.AddConnections();
}
Create your controllers by inheriting from MqttBaseController
. Use the attribute MqttRoute
on your assembly, controller or action to route messages whose topic matches the route to your actions.
Here is an example controller:
[MqttController]
[MqttRoute("[controller]")]
public class MqttWeatherForecastController : MqttBaseController
{
private readonly ILogger<MqttWeatherForecastController> _logger;
// Controllers have full support for dependency injection
public MqttWeatherForecastController(ILogger<MqttWeatherForecastController> logger)
{
_logger = logger;
}
// Supports template routing with typed constraints
[MqttRoute("{zipCode:int}/temperature")]
public Task WeatherReport(int zipCode)
{
// We have access to the MqttContext
if (zipCode != 90210)
{
MqttContext.CloseConnection = true;
}
// We have access to the raw message. There is no model binding (yet) so we get our message like
// this instead of using a [FromBody] attribute.
var temperature = BitConverter.ToDouble(Message.Payload);
_logger.LogInformation($"It's {temperature} degrees in Hollywood");
// Example validation
if (temperature <= 0 || temperature >= 130)
{
// Prevents the message from being published on the topic to any subscribers
return BadMessage();
}
// Publish the message to all subscribers on this topic
return Ok();
}
}
In the class Program.cs
, use the known setup (as described above already):
public static class Program
{
public static Task Main(string[] args)
{
return BuildWebHost(args).RunAsync();
}
private static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseKestrel(o =>
{
o.ListenAnyIP(1883, l => l.UseMqtt());
o.ListenAnyIP(5000); // default http pipeline
})
.UseStartup<Startup>()
.Build();
}
In the class Startup.cs
, use the following code:
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services
.AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
.AddMqttConnectionHandler()
.AddConnections();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapMqtt("/mqtt");
});
app.UseMqttServer(server =>
{
server.StartedHandler = new MqttServerStartedHandlerDelegate(async args =>
{
var frameworkName = GetType().Assembly.GetCustomAttribute<TargetFrameworkAttribute>()?
.FrameworkName;
var msg = new MqttApplicationMessageBuilder()
.WithPayload($"Mqtt hosted on {frameworkName} is awesome")
.WithTopic("message");
while (true)
{
try
{
await server.PublishAsync(msg.Build());
msg.WithPayload($"Mqtt hosted on {frameworkName} is still awesome at {DateTime.Now}");
}
catch (Exception e)
{
Console.WriteLine(e);
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(2));
}
}
});
});
app.Use((context, next) =>
{
if (context.Request.Path == "/")
{
context.Request.Path = "/Index.html";
}
return next();
});
app.UseStaticFiles();
app.UseStaticFiles(new StaticFileOptions
{
RequestPath = "/node_modules",
FileProvider = new PhysicalFileProvider(Path.Combine(env.ContentRootPath, "node_modules"))
});
}
}
Check the example under https://github.com/chkr1011/MQTTnet/blob/master/Source/MQTTnet.Server/Web/Startup.cs for a correct WebSocket implementation in Net5.0 as well.