- Aedes
- new Aedes([options]) / new Aedes.Server([options])
- aedes.id
- aedes.connectedClients
- aedes.closed
- Event: client
- Event: clientReady
- Event: clientDisconnect
- Event: clientError
- Event: connectionError
- Event: keepaliveTimeout
- Event: publish
- Event: ack
- Event: ping
- Event: subscribe
- Event: unsubscribe
- Event: connackSent
- Event: closed
- aedes.handle (stream)
- aedes.subscribe (topic, deliverfunc, callback)
- aedes.unsubscribe (topic, deliverfunc, callback)
- aedes.publish (packet, callback)
- aedes.close ([callback])
- Handler: preConnect (client, packet, callback)
- Handler: authenticate (client, username, password, callback)
- Handler: authorizePublish (client, packet, callback)
- Handler: authorizeSubscribe (client, subscription, callback)
- Handler: authorizeForward (client, packet)
- Handler: published (packet, client, callback)
- options
<object>
mq
<MQEmitter>
middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. Default:mqemitter
concurrency
<number>
maximum number of concurrent messages delivered bymq
. Default:100
persistence
<Persistence>
middleware that stores QoS > 0, retained, will packets and subscriptions. Default:aedes-persistence
(in memory)queueLimit
<number>
maximum number of queued messages before client session is established. If number of queued items exceeds,connectionError
throws an errorClient queue limit reached
. Default:42
maxClientsIdLength
option to override MQTT 3.1.0 clients Id length limit. Default:23
heartbeatInterval
<number>
an interval in millisconds at which server beats its health signal in$SYS/<aedes.id>/heartbeat
topic. Default:60000
id
<string>
aedes broker unique identifier. Default:uuidv4()
connectTimeout
<number>
maximum waiting time in milliseconds waiting for aCONNECT
packet. Default:30000
- Returns
<Aedes>
Create a new Aedes server.
Aedes is the class and function exposed by this module. It can be created by Aedes()
or using new Aedes()
. An variant aedes.Server
is for TypeScript or ES modules.
<string>
Default:uuidv4()
Server unique identifier.
<number>
Default: 0
Number of connected clients in server.
<boolean>
Default: false
a read-only flag indicates if server is closed or not.
client
<Client>
Emitted when the client
registers itself to server. The client
is not ready yet. Its connecting
state equals to true
.
Server publishes a SYS topic $SYS/<aedes.id>/new/clients
to inform it registers the client into its registration pool. client.id
is the payload.
client
<Client>
Emitted when the client
has received all its offline messages and be initialized. The client
connected
state equals to true
and is ready for processing incoming messages.
client
<Client>
Emitted when a client disconnects.
Server publishes a SYS topic $SYS/<aedes.id>/disconnect/clients
to inform it deregisters the client. client.id
is the payload.
client
<Client>
error
<Error>
Emitted when an error occurs.
client
<Client>
error
<Error>
Emitted when an error occurs. Unlike clientError
it raises only when client
is uninitialized.
client
<Client>
Emitted when timeout happes in the client
keepalive.
Emitted when servers delivers the packet
to subscribed client
. If there are no clients subscribed to the packet
topic, server still publish the packet
and emit the event. client
is null
when packet
is an internal message like aedes heartbeat message and LWT.
Note!
packet
belongsaedes-packet
type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.
Emitted an QoS 1 or 2 acknowledgement when the packet
successfully delivered to the client
.
Emitted when client
sends a PINGREQ
.
subscriptions
<object>
client
<Client>
Emitted when client
successfully subscribe the subscriptions
in server.
subscriptions
is an array of { topic: topic, qos: qos }
. The array excludes duplicated topics and includes negated subscriptions where qos
equals to 128
. See more on authorizeSubscribe
Server publishes a SYS topic $SYS/<aedes.id>/new/subscribers
to inform a client successfully subscribed to one or more topics. The payload is a JSON that has clientId
and subs
props, subs
equals to subscriptions
array.
unsubscriptions
Array<string>
client
<Client>
Emitted when client
successfully unsubscribe the subscriptions
in server.
unsubscriptions
are an array of unsubscribed topics.
Server publishes a SYS topic $SYS/<aedes.id>/new/unsubscribers
to inform a client successfully unsubscribed to one or more topics. The payload is a JSON that has clientId
and subs
props, subs
equals to unsubscriptions
array.
Emitted when server sends an acknowledge to client
. Please refer to the MQTT specification for the explanation of returnCode object property in CONNACK
.
Emitted when server is closed.
- stream:
<net.Socket>
|<stream.Duplex>
- Returns:
<Client>
A connection listener that pipe stream to aedes.
const aedes = require('./aedes')()
const server = require('net').createServer(aedes.handle)
- topic:
<string>
- deliverfunc:
<Function>
(packet, cb) => void
- packet:
<aedes-packet>
&PUBLISH
- cb:
<Function>
- packet:
- callback:
<Function>
Directly subscribe a topic
in server side. Bypass authorizeSubscribe
The topic
and deliverfunc
is a compound key to differentiate the uniqueness of its subscription pool. topic
could be the one that is existed, in this case deliverfunc
will be invoked as well as SUBSCRIBE
does.
deliverfunc
supports backpressue.
In aedes internal, deliverfunc
is a function that delivers messages to subscribed clients.
Note!
packet
belongsaedes-packet
type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.
In general you would find most properities in packet
is same as what the incoming PUBLISH
is. For sure cmd
property in packet
structure in deliverfunc
must be publish
.
Note! it requires
deliverfunc
to callcb
before the function returns, otherwise some subscribed clients with sametopic
will not receive messages.
callback
is invoked when server successfully registers the subscription.
Reverse of aedes.subscribe.
Note! the
deliverfunc
should be same as whenaedes.subscribe
does, otherwise the unsubscription will fail.
packet
<object>
PUBLISH
callback
<Function>
(error) => void
- error
<Error>
|null
- error
Directly deliver packet
on behalf of server to subscribed clients. Bypass authorizePublish
.
callback
will be invoked with error
arugments after finish.
- callback:
<Function>
Close aedes server and disconnects all clients.
callback
will be invoked when server is closed.
- client:
<Client>
- packet:
<object>
CONNECT
- callback:
<Function>
(error, successful) => void
- error
<Error>
|null
- successful
<boolean>
- error
Invoked when server receives a valid CONNECT
packet. The packet can be modified.
client
object is in default state. If invoked callback
with no errors and successful
be true
, server will continue to establish a session.
Any error
will be raised in connectionError
event.
Some Use Cases:
- Rate Limit / Throttle by
client.conn.remoteAddress
- Check
aedes.connectedClient
to limit maximum connections - IP blacklisting
aedes.preConnect = function(client, packet, callback) {
callback(null, client.conn.remoteAddress === '::1') {
}
aedes.preConnect = function(client, packet, callback) {
callback(new Error('connection error'), client.conn.remoteAddress !== '::1') {
}
- client:
<Client>
- username:
<string>
- password:
<Buffer>
- callback:
<Function>
(error, successful) => void
- error
<Error>
|null
- successful
<boolean>
- error
Invoked after preConnect
.
Server parses the CONNECT
packet, initializes client
object which set client.id
to match the one in CONNECT
packet and extract username
and password
as parameters for user-defined authentication flow.
If invoked callback
with no errors and successful
be true
, server authenticates client
and continues to setup the client session.
If authenticated, server acknowledges a CONNACK
with returnCode=0
, otherwise returnCode=5
. Users could define the value between 2
and 5
by defining a returnCode
property in error
object.
aedes.authenticate = function (client, username, password, callback) {
callback(null, username === 'matteo')
}
aedes.authenticate = function (client, username, password, callback) {
var error = new Error('Auth error')
error.returnCode = 4
callback(error, null)
}
Please refer to Connect Return Code to see their meanings.
- client:
<Client>
|null
- packet:
<object>
PUBLISH
- callback:
<Function>
(error) => void
- error
<Error>
|null
- error
Invoked when
- publish LWT to all online clients
- incoming client publish
client
is null
when aedes publishes obsolete LWT without connected clients
If invoked callback
with no errors, server authorizes the packet otherwise emits clientError
with error
. If an error
occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2)
aedes.authorizePublish = function (client, packet, callback) {
if (packet.topic === 'aaaa') {
return callback(new Error('wrong topic'))
}
if (packet.topic === 'bbb') {
packet.payload = Buffer.from('overwrite packet payload')
}
callback(null)
}
By default authorizePublish
throws error in case a client publish to topics with $SYS/
prefix to prevent possible DoS (see #597). If you write your own implementation of authorizePublish
we suggest you to add a check for this. Default implementation:
function defaultAuthorizePublish (client, packet, callback) {
if (packet.topic.startsWith($SYS_PREFIX)) {
return callback(new Error($SYS_PREFIX + ' topic is reserved'))
}
callback(null)
}
- client:
<Client>
- subscription:
<object>
- callback:
<Function>
(error) => void
- error
<Error>
|null
- subscription:
<object>
|null
- error
Invoked when
- restore subscriptions in non-clean session.
- incoming client
SUBSCRIBE
subscription
is a dictionary object like { topic: hello, qos: 0 }
.
If invoked callback
with no errors, server authorizes the packet otherwise emits clientError
with error
.
In general user should not touch the subscription
and pass to callback, but server gives an option to change the subscription on-the-fly.
aedes.authorizeSubscribe = function (client, sub, callback) {
if (sub.topic === 'aaaa') {
return callback(new Error('wrong topic'))
}
if (sub.topic === 'bbb') {
// overwrites subscription
sub.topic = 'foo'
sub.qos = 1
}
callback(null, sub)
}
To negate a subscription, set the subscription to null
. Aedes ignores the negated subscription and the qos
in SubAck
is set to 128
based on MQTT 3.11 spec:
aedes.authorizeSubscribe = function (client, sub, callback) {
// prohibited to subscribe 'aaaa' and suppress error
callback(null, sub.topic === 'aaaa' ? null : sub)
}
Invoked when
- aedes sends retained messages when client reconnects
- aedes pre-delivers subscribed message to clients
Return null
will not forward packet
to clients.
In general user should not touch the packet
and return it what it is, but server gives an option to change the packet
on-the-fly and forward it to clients.
Note!
packet
belongsaedes-packet
type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.
aedes.authorizeForward = function (client, packet) {
if (packet.topic === 'aaaa' && client.id === "I should not see this") {
return
}
if (packet.topic === 'bbb') {
packet.payload = new Buffer('overwrite packet payload')
}
return packet
}
same as Event: publish
, but provides a backpressure functionality. TLDR; If you are doing operations on packets that MUST require finishing operations on a packet before handling the next one use this otherwise, expecially for long running operations, you should use Event: publish
instead.