NSQ is a realtime distributed messaging platform designed to operate at scale, handling billions of messages per day. It promotes distributed and decentralized topologies without single points of failure, enabling fault tolerance and high availability coupled with a reliable message delivery guarantee.
From a high-level, the purpose of the module might be for things like:
- Integrate to an application to make real-time routing decisions (instead of using, say, a SQL database)
- Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
- Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
Supported operations are:
- publish json payloads to nsq topics
- publish json payloads to nsq topics and wait for correlated response message
- subscribe to an nsq topic and channel and handle events from that channel
The NSQ module also has support to publish updates to presence module thru the nsq_pua_publish function.
This module is heavily based on the Kazoo module from 2600hz.
The module works with a main forked process that does the communication with your nsq system for issuing publishes, waiting for replies, and consuming messages. When it consumes a message it defers the process to a worker thread so it doesn't block the main process (uses libev).
The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
NSQ module will try to execute the event route from most significant to less significant. define the event route like event_route[nsq:consumer-event[-payload_key_value[-payload_subkey_value]]]
...
modparam("nsq", "consumer_event_key", "Event-Type")
modparam("nsq", "consumer_event_subkey", "Event-Name")
...
event_route[nsq:consumer-event-presence-update]
{
# presence is the value extracted from Event-Type field in json payload
# update is the value extracted from Event-Name field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
...
}
event_route[nsq:consumer-event-presence]
{
# presence is the value extracted from Event-Type field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
...
}
event_route[nsq:consumer-event]
{
# this event route is executed if we can't find the previous
}
Consumed messages have the option of being acknowledged in two ways:
- immediately when received
- after processing by the worker
The following modules must be loaded before this module:
- none
- libev
- libjson
- libuuid
The http address of the nsqd to post messages to
Default value is Null. You must set this parameter value for the module to work
Example
...
modparam("nsq", "nsqd_address", "127.0.0.1:4151")
...
The http address of the nsq lookupd servers ( comma seperated )
Default value is Null. You must set this parameter value for the module to work
Example
...
modparam("nsq", "lookupd_address", "10.10.10.1:4161,10.10.10.2:4161")
...
The topic to listen on for inbound events
Example
...
modparam("nsq", "consumer_topic", "kamailio")
...
The channel to listen on for inbound events
Example
...
modparam("nsq", "consumer_channel", "sip-proxy-01")
...
The JSON property name to watch for for handling event_routes
Default value is "Event-Type"
Example
...
modparam("nsq", "consumer_event_key", "Type")
...
The JSON property sub key name to watch for for handling event_routes
Default value is "Event-Name"
Example
...
modparam("nsq", "consumer_event_subkey", "Name")
...
Number of messages the nsq client will handle concurrently
Default value is 100
Example
...
modparam("nsq", "max_in_flight", 5)
...
Number of seconds until timeout for query requests
Default value is 2
Example
...
modparam("nsq", "query_timeout", 5)
...
The database for the presentity table.
If set, the nsq_ppua_publish function will update the presentity status in the database.
Default value is “NULL”.
Example
...
modparam("nsq", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")
...
The name of the presentity table in the database.
Default value is “presentity”.
Example
...
modparam("nsq", "presentity_table", "my_presentity_table")
...
The function publishes a json payload to the nsq topic passed in.
This function can be used from ANY ROUTE.
Example
...
$var(nsq_payload_request) = "{'Event-Type' : 'directory', 'Event-Name' : 'reg_success', 'Contact' : '" + $var(fs_contact) + "', 'Call-ID' : '" + $ci + "', 'Realm' : '" + $fd +"', 'Username' : '" + $fU + "', 'From-User' : '" + $fU + "', 'From-Host' : '" + $fd + "', 'To-User' : '" + $tU +"', 'To-Host' : '" + $td + "', 'User-Agent' : '" + $ua +"' ," + $var(register_contants)+ " }";
nsq_publish("registrations", $var(nsq_payload_request));
...
The function publishes a json payload to nsq, waits for a correlated message and puts the result in target_var. target_var is optional as the function also puts the result in pseudo-variable $nqR.
This function can be used from ANY ROUTE.
Example
...
$var(nsq_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }";
nsq_encode("$ci", "$var(callid_encoded)");
if(nsq_query("callevt", $var(nsq_payload_request), "$var(nsq_result)")) {
nsq_json("$var(nsq_result)", "Channels[0].switch_url", "$du");
if($du != $null) {
xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
return;
}
}
...
The function build presentity state from json_payload and updates presentity table.
This function can be used from ANY ROUTE.
Example
...
event_route[nsq:consumer-event-presence-update]
{
xlog("L_INFO", "received $(nqE{nq.json,Event-Package}) update for $(nqE{nq.json,From})");
nsq_pua_publish($kzE);
pres_refresh_watchers("$(nqE{nq.json,From})", "$(nqE{nq.json,Event-Package})", 1);
}
...
The function encodes the 1st parameter to JSON and puts the result in the 2nd parameter.
This function can be used from ANY ROUTE.
Example
...
event_route[nsq:consumer-event-presence-update]
{
xlog("L_INFO", "received $(nqE{nq.json,Event-Package}) update for $(nqE{nq.json,From})");
nsq_pua_publish($nqE);
pres_refresh_watchers("$(nqE{nq.json,From})", "$(nqE{nq.json,Event-Package})", 1);
}
...