-
Notifications
You must be signed in to change notification settings - Fork 57
Connections
This assumes you’ve read both the Channels and Pipelines articles. If you haven’t, start there.
Under the covers, though, channels are made of two halves, an emitter and a receiver. Messages are enqueued into the receiver, and read from the emitter. In a channel created using (channel)
, the same node is both the receiver and the emitter. That means that when we enqueue a message into a channel, we can read that same message out:
> (def ch (channel))
#'ch
> (enqueue ch "hello")
<< ... >>
> (read-channel ch)
<< "hello" >>
However, it’s not often that we want to receive a message from ourselves. Rather, most of the time for a given channel we’re only sending or receiving, and the inverse operation is being done somewhere else.
Now consider the case of a network socket, where we can both send and receive messages. Since channels are unidirectional, to model bidirectional communication we have to use two channels, one for sending and the other for receiving. However, we’re only using half of each channel, and by having two we run the risk of using the send channel for receiving, or vise-versa.
Ideally we’d like to merge the two halves from the two channels into a single, coherent object. And, as it turns out, this is exactly what (splice emitter receiver)
does. splice
in effect returns a proxy channel, which will forward all messages to receiver
, and all message consumption to emitter
.
However, rather than use splice
ourselves, it’s often convenient to just call (channel-pair)
, which will return a list of two channels. A message enqueued into one channel can be received from the other, and vise-versa. Lamina uses one of these paired channels (it doesn’t matter which) to represent a bidirectional socket.
A connection to a server is represented by an async-promise. If the connection is successful, the async-promise will be realized as a bidirectional channel that can be used to communicate with the server. When the connection closes, the channel will close.
Here is some code to simulate a connection to an echo server:
(use 'lamina.core)
(defn connect-to-echo-server []
(let [[client server] (channel-pair)]
(siphon server server)
(success-result client)))
The server
side of the channel pair siphons received data back to the client
. The (success-result)
call converts the client
channel into an async-promise which is immediately realized.
Let’s say that we want to send a message to a server, and wait for the response. A synchronous approach would look like this:
(def ch (wait-for-result (connect-to-echo-server))
(enqueue ch "hello")
(wait-for-message ch)
This will wait forever for a response from the server, but a timeout can be set on both wait-for-result
and wait-for-message
. If the timeout elapses, an exception will be thrown.
The asynchronous approach is very similar:
(run-pipeline (connect-to-echo-server)
(fn [ch]
(enqueue ch "hello")
(read-channel ch)))
This call to run-pipeline
will return an async-promise. If it fails to connect, or the connection closes before a response is received, it will emit an error (the simulated connection used here will never fail to connect or close prematurely). A timeout can also be specified for read-channel
; if the timeout elapses, the result-channel will emit an error.
A server must specify a handler function, which will be called every time a new connection is opened. The handler function must take two parameters, a channel and information about the connection.
Let’s consider a simple handler function that takes the first message from the client, and echoes it back to them:
(defn handler [ch info]
(receive ch #(enqueue ch %)))
But this only sets up a response to the first message. All subsequent messages will simply queue up in the channel until the connection closes, at which point everything will be garbage collected. Instead, we want to specify how we respond to all messages. This will echo back all messages to the client:
(defn handler [ch info]
(siphon ch ch))
This takes all messages that come out of the channel, and enqueues them back into it.
As a minor variation, we can assume that the first message we receive is the name of the client, and then prepend that name to every message we echo back.
(defn handler [ch info]
(receive ch
(fn [name]
(siphon
(map* #(str name ": " %) ch)
ch))))
This consumes the first message from the client, and using the information from that message sets up the permanent handlers for the client.
It’s also possible to pass messages between clients. Here’s a simple chat server:
(def broadcast-channel (permanent-channel))
(defn handler [ch info]
(siphon ch broadcast-channel)
(siphon broadcast-channel ch))
This takes all messages from each client, and passes it into a single broadcast channel. The output of the broadcast channel is siphoned into every client, meaning that everyone receives their own messages, along with everyone else’s.
Given a connection function that follows the standard API (i.e. returns a result-channel that emits a channel), we can build higher-level abstractions on top of it. All functions listed here are in the lamina.connections
namespace.
Given a zero-parameter function which connects to a server, (persistent-connection connect-fn)
will return a zero-parameter function that also returns a result-channel that emits a channel. Under the covers, however, it will only call the connection function often enough to keep a single open connection to the server.
To close the connection, call (close-connection fn)
, passing in the function returned by persistent-connection
. To reset the connection (close it and allow it to be automatically reopened), call (reset-connection fn)
.
Given a zero-parameter function which connects to a server, (client connect-fn)
returns a function which takes a request and an optional timeout, and returns an async-promise that will emit the server’s response. The function may be called as often as we like, but requests will only be sent to the server one at a time; each new request will only be sent once the previous response comes back. (pipelined-client connect-fn)
works the same as client, but requests will be sent as soon as they are made, under the assumption that responses will return in the same order.
Each client function represents a persistent connection to the server. To close the connection, pass the client function into close-connection
, and to reset use reset-connection
.
Both client
and pipelined-client
can take an options map as a second parameter. This map may contain the following keys:
:name |
if defined, instruments the request function, creating probes within that namespace |
:implicit? |
whether the instrumentation will be captured by higher-level functions, defaults to true |
:probes |
a map of sub-names (with :name implicitly prefixed) onto channels that consume those probes |
:on-connected |
a callback for when connection-generator creates a new connection, which will be invoked with the new socket channel |
:executor |
an executor which will be used for response handling |
:retry? |
whether requests which fail due to connection failure will automatically retry; this should only be done if the requests are idempotent, and defaults to false |
There is also an optional :heartbeat
option, which describes a periodic request that should be made to check connection health. Failure of this health check will cause the connection to be reset. It should contain a map with the following options:
:request |
the hearbeat request |
:interval |
the time between heartbeat requests in milliseconds, defaults to ten seconds |
:timeout |
the max time spent waiting for the heartbeat response in milliseconds, defaults to five seconds |
:response-validator |
a predicate which takes a response, and returns true if it’s valid. Defaults to always returning true. |
:on-failure |
a callback which is invoked with no parameters when the heartbeat fails |
Given a channel representing a connection to a client, (server channel handler-fn)
will process the requests from the client one at a time. Each time a request is made, the handler function will be passed a result-channel which accepts the response, and a request. The response can be made at any point, and only once it’s made will the handler function receive the next request.
pipelined-server
follows the same interface as server
, except that the handler function is called as soon as each request comes in. The handler function can respond to each request at any time, but responses are sent to clients in the same order the requests are received. This means that if a complex request is made, all subsequent requests cannot receive their response until the first is completed.
These are counterparts of client
and pipelined-client
. If half of a channel-pair is given to each, then they can be used to not only model network communication, but also message flow between different parts of the same process.
Both server
and pipelined-server
can take an options map as a third parameter. This map may contain the following keys:
:name |
if defined, instruments the handler function, creating probes within that namespace. This simplifies the handler as a transformation of request → response, eliding the `result-channel` and treating the time spent waiting for it to become realized as part of the handler’s invocation. |
:implicit? |
whether the instrumentation will be captured by higher-level functions, defaults to true |
:probes |
a map of sub-names (with :name implicitly prefixed) onto channels that consume those probes |
:executor |
the executor for the handler function |
:result-channel-generator |
a function that takes zero parameters and returns the result-channel that will be passed into the handler |
:error-response |
takes an exception thrown by the handler function, and returns an appropriate error response. If this isn’t defined, the server will simply return the exception. |