This document describes the key components of LuaRadio and how they interact.
Blocks provide a process(...)
method that accepts vectors of input samples as
arguments, processes them, and returns vectors of output samples. Blocks may
retain state.
Input sample vectors must be processed in their entirety by blocks in
process(...)
. Samples across multiple inputs are synchronized, meaning that
the multiple input vectors are always of the same length, and samples across
the input vectors all correspond to the same timestep.
Output sample vectors may be produced by blocks asynchronously, meaning that
there is no requirement on the amount of output samples produced, or on which
call to process(...)
that output samples are produced.
Vectors of samples are serialized between output and input ports of blocks over anonymous UNIX sockets.
Vectors of CStruct
data type samples are
serialized raw, in their native memory representation, with no marshalling or
unmarshalling.
Vectors of Object
data type samples are
marshalled and unmarshalled with MessagePack and
serialized as byte strings, preceded by their length.
LuaRadio uses multi-processing. Every block is run in its own process under its own Lua state.
Blocks do not share memory. Blocks use IPC to serialize samples to other blocks and communicate with applications.
Memory for input samples is allocated once. A persistent buffer is allocated
for each block input port, where raw input samples are read into. The raw
input samples are cast into read-only vectors, which are provided as the
arguments to the block's process(...)
method.
Memory for output samples is typically allocated once by each block. Blocks
are responsible for managing their own output sample memory. Most blocks
create a persistent output sample vector in their initialize()
method, and
resize it as needed in process(...)
to store the computed output samples.
However, this is not strictly enforced by the framework.
Blocks that do reuse output sample vectors have constant memory usage in the
steady state. The output sample vectors approach a stable size, as they are
resized in process(...)
to accommodate the inputs vectors the block consumes.
All objects in LuaRadio are garbage collected, although this mainly applies to transient objects. The input and output sample memory in the situations described above is anchored throughout the processing lifetime of the block.
Moving a vector of samples between blocks has an overhead of two copies: one from writing it to the UNIX socket in the producing block, and one from reading it from the UNIX socket in the consuming block. This overhead could be reduced to one copy, by changing the sample transport from UNIX sockets to a shared memory circular buffer (not unlike GNU Radio).
The overhead could be further reduced to zero copies, if the vectors for output
samples were not allocated by the block, but instead pointers into the
persistent shared memory circular buffer (also not unlike GNU Radio) that is
shared between the output and input port. Implementing this would likely
require a different process(...)
signature that includes the outputs vectors
as arguments, and a new resizing mechanism for output vectors.
Multi-processing instead of multi-threading for concurrency incurs some memory and CPU overhead from requiring a process for each block. However, multi-threading poses other issues, like sharing instantiated and initialized block state of the parent thread with the Lua state of each child thread. This problem is addressed in the multi-processing architecture with forking.
Block processes currently have a slightly larger memory footprint than
necessary, as other blocks and their initialized objects, e.g. buffers
allocated in initialize()
, are not yet released after forking. This can be
addressed by pruning references to all unneeded blocks and associated
connectivity after forking.
Blocks cannot be manipulated at runtime, e.g. modifying their attributes or calling methods on them, since each block runs in an independent Lua state. This could be worked around by implementing RPC for these attributes and calls, but this would add substantial complexity.
Samples are typed by special data types that implement the necessary interface
to be serialized and deserialized between blocks. These data types can either
be CStruct
types, which are backed by a
C structure, or Object
types, which are
backed by a Lua object.
CStruct
types are serialized as raw contiguous samples between blocks in
their native memory representation, with no marshalling and unmarshalling.
These types must be of a fixed size, so the sample boundaries are well-defined
in the stream.
Object
types are serialized as MessagePack marshalled
bytes between blocks. These types can be variable size, may nest other Lua
types, and may have optional members.
LuaRadio has four basic types, all of which are CStruct
types. These are the
ComplexFloat32
,
Float32
, Bit
,
and Byte
types.
They are backed by the following C structure types:
typedef struct {
float real;
float imag;
} complex_float32_t;
typedef struct {
float value;
} float32_t;
typedef struct {
uint8_t value;
} bit_t;
typedef struct {
uint8_t value;
} byte_t;
The use of a structure to back CStruct
types, rather than the raw C type
(e.g. float
), is for implementation reasons. It allows the framework to
associate a metatable with the type using LuaJIT
FFI library's
ffi.metatype()
, to bind
useful metamethods and methods to all instances of the type. It also makes
those instances distinct from other occurrences of the underlying data types
(e.g. float32_t
vs float
).
Users can define their own CStruct
and Object
types and bind methods to
them. See the Creating Blocks guide for
more details and examples.
Code reference: CStruct class, Object class.
It would be inefficient for blocks to process one sample at a time, as the overhead of serializing the sample and calling the block to process it would exceed the cost of processing it. Instead, blocks operate on a vector of samples at a time, to amortize the overhead of serialization.
Vectors are dynamic arrays of a CStruct
or Object
data type. Blocks get
their inputs as vectors, and return their outputs as vectors.
Each data type provides two static methods for creating a vector of itself:
.vector(num)
for a zero-initialized vector, or .vector_from_array(arr)
for
a vector initialized from a Lua array. For example:
-- ComplexFloat32 vector of length 16
local vec = radio.types.ComplexFloat32.vector(16)
-- Byte vector of length 10
local vec = radio.types.Byte.vector(10)
-- Float32 vector of length 3 from array initializer
local vec = radio.types.Float32.vector_from_array({1.0, 2.0, 3.0})
Vectors can be resized and appended to:
local vec = radio.types.Byte.vector(10)
print(vec.length) --> 10
vec:resize(5)
print(vec.length) --> 5
vec:append(radio.types.Byte(0xAA))
print(vec.length) --> 6
Vectors of CStruct
typed samples are laid out contiguously in memory. The
array of samples is available under the .data
member, and its length under
the .length
member. These samples can be modified directly in Lua, or can be
passed to an external library for processing.
Resizing a CStruct
typed vector only causes a re-allocation when it is grown
to a larger size. The underlying buffer is retained on resizing to a smaller
size; just the bookkeeping is updated. This allows blocks that reuse vectors
for output samples to approach constant memory usage, as the underlying memory
of the vectors will reach a stable size for the inputs the block consumes.
All CStruct
typed vectors are allocated with page alignment, to enable
processing with libraries that require, or perform better with, aligned
buffers. This is often because SIMD operations are involved.
Vectors of Object
typed samples are stored in a Lua array, but provide a
compatible interface to CStruct
typed vectors. These vectors cannot be passed
to external libraries.
Code reference: Vector and ObjectVector classes.
Blocks are classes derived from
radio.block.Block
that implement their
functionality in the following methods:
instantiate(...)
— constructorinitialize()
— initialization (optional)process(...)
— main work methodcleanup()
— clean up (optional)
The role of the instantiate()
constructor is to establish the basic state of
the block and to register its type signatures, which specify the block's
input/output port names and types.
The initialize()
method is called after the block has been connected in a
flow graph and differentiated. It allows the block to perform additional
initialization based on its differentiated type signature and its sample rate.
The process()
method is the main work method of the block. It receives input
vectors of samples as arguments, and returns output vectors of samples. This
method is called repeatedly by the framework to process inputs into outputs.
The cleanup()
method is called by the framework when the flow graph has
collapsed, for additional clean up of resources.
Source blocks and blocks that modify the sample rate must implement the
get_rate()
method, which returns the
source's sample rate as a number, in samples per second.
Code reference: Block class.
A type signature is a description of the input/output port names and data types of a block. LuaRadio blocks can support multiple type signatures, all of which must share the same input/output count and names, but may differ in data types. Blocks register their type signatures in their constructor, so that they can be connected into a flow graph after they are instantiated. The framework selects the correct type signature in its differentiation phase, described in its section below.
Code reference: Block class.
The instantiate(...)
method is called whenever a block is instantiated by
name. This method takes the arguments passed to the block on instantiation.
Blocks must register type signatures with the
add_type_signature()
method in their instantiate()
constructor. This method takes an array of
input port descriptors, followed by an array of output port descriptors. Each
port descriptor specifies a name and a data type. For example, an AddBlock
that supports both complex-valued and real-valued inputs would register:
function AddBlock:instantiate()
self:add_type_signature({radio.block.Input("in", radio.types.Float32}),
{radio.block.Output("out", radio.types.Float32)})
self:add_type_signature({radio.block.Input("in", radio.types.ComplexFloat32}),
{radio.block.Output("out", radio.types.ComplexFloat32)})
end
Source and sink blocks may specify empty arrays for inputs or outputs, respectively, when they add type signatures. Otherwise, sources and sinks are implemented largely the same way as other blocks.
The add_type_signature()
method can also be used to specify different
process()
and initialize()
methods to type signatures, that are bound to
the block on differentiation. See the Creating
Blocks guide for examples.
After a block is instantiated, it can be connected into a flow graph under a
CompositeBlock
. This is described in more detail in the Composite
Blocks section below.
When the add_type_signature()
method is called in a block's instantiate()
constructor, it builds InputPort
and OutputPort
containers for each
specified input and output port of the block, and stores them in arrays under
the .inputs
and .outputs
members. These containers are the actual "ports"
connected in a flow graph. They contain a name and block owner, and are later
populated with their concrete data type and a shared Pipe
object.
The act of connecting two blocks in a flow graph is registering the block's
InputPort
instance as the key and the OutputPort
instance as a value in a
hash table owned by a CompositeBlock
, thereby building a graph of input and
output connections.
Code reference: Block class, InputPort, OutputPort classes, CompositeBlock class.
When a flow graph is run, each block in the flow graph is first differentiated into a compatible type signature. This differentiation starts at the source blocks and ends at the sink blocks, and is carried out in a downstream order.
A block is differentiated by its input types, using the
differentiate()
method. This method takes an array of input data types and differentiates the
block into a compatible type signature, by finding a registered type signature
with matching input data types. The method raises an error if a compatible
type signature is not found.
Type signatures may also specify a function predicate instead of a concrete
data type for input ports. For example, the
JSONSink
does this to accept any data type
that implements to_json()
. In those cases, differentiate()
calls the
function predicate with the input type and expects a boolean result to indicate
if the input type is compatible.
The result of differentiation is that the InputPort
and OutputPort
ports in
the block's .inputs
and .outputs
take on the concrete data types specified
in the selected type signature, and the block's initialize()
and process()
methods are bound to the ones specified in the type signature. The concrete
data types are available to the block with the
get_input_type()
and
get_output_type()
methods.
Since a block is differentiated by its input types, it cannot have multiple type signatures that share the same input types, as this would cause ambiguity.
After differentiation, the block's output types are well defined, and can then be used in the differentiation of downstream blocks.
local multiply = radio.MultiplyBlock()
-- Differentiate into the complex-valued flavor of MultiplyBlock
multiply:differentiate({radio.types.ComplexFloat32, radio.types.ComplexFloat32})
Code reference: Block class.
The initialization phase takes place after the differentiation phase in running
a flow graph. In this phase, every block's initialize()
method is called,
starting at the source blocks and ending at the sink blocks, in a downstream
order.
In the initialize()
method, the block can perform data type dependent
initialization with the
get_input_type()
and
get_output_type()
methods, which return the differentiated input and output types of the
specified port index, respectively.
The block can also perform sample rate dependent initialization with
get_rate()
, which recursively calls
get_rate()
on upstream blocks in the flow graph to determine the sample rate.
Blocks may modify the sample rate for downstream blocks by overriding this
method, and source blocks are required to implement it to return a concrete
value.
Most blocks will create persistent output sample vectors in initialize()
, to
be used by process(...)
. This allows blocks to efficiently produce new
samples without excessive allocations and deallocations.
Code reference: Block class.
The block's process()
method is called repeatedly in the running phase
of a flow graph to process inputs into outputs.
This method receives a set of input vectors as arguments, corresponding to the
input ports it defined in its type signatures. These inputs are immutable,
read-only vectors and are all of the same length. The process()
method is
responsible for computing output vectors from these inputs and any block state,
and returning the output vectors in the order corresponding to the output ports
it defined in its type signatures.
Blocks are responsible for managing their output sample vectors. Most blocks
allocate persistent output sample vectors in their initialize()
method, and
then resize, populate, and return them in process(...)
. Since vector resizes
only cause re-allocation when they are grown, the underlying memory of output
vectors approach a stable size as the vector is resized to accommodate the
block's inputs.
For example, the process method for adding two inputs might look like:
function AddBlock:process(x, y)
local out = self.out:resize(x.length)
for i = 0, x.length-1 do
out.data[i] = x.data[i] + y.data[i]
end
return out
end
After a flow graph is connected, differentiated, and initialized, every block is run concurrently.
While running, each block's inputs share a Pipe
object with another block's
output. A block output may have multiple Pipe
objects to several different
block inputs, but every block input only has one Pipe
object.
The Pipe
object provides an interface for the serialization and
deserialization of sample vectors between blocks. The read()
method reads a
sample vector from the pipe. The write()
method writes a sample vector to the
pipe.
Each block also has a ControlSocket
object, which provides an out-of-band
interface for block shutdown by the parent process, and in the future, will
allow the block to receive asynchronous control messages.
Both Pipe
and ControlSocket
objects use socketpair()
for IPC.
A block is run with its run()
method, which is a loop that repeatedly reads
input pipes into an array of vectors, calls process(...)
with these input
vectors, and writes the resulting array of vectors to the output pipes.
The run()
method uses a PipeMux
object to handle polling and reading from
all input pipes and from the ControlSocket
. The PipeMux
read()
method
uses the poll()
system call to wait for new data at the block's inputs, and
then reads and returns an array of deserialized sample vectors. It also detects
EOF on any input pipes, indicating flow graph collapse, or EOF on the
ControlSocket
, indicating a shutdown event.
Code reference: Block class, PipeMux class.
A block runs indefinitely in its run()
method, until one of the input Pipe
objects reads EOF, indicating that the upstream block closed its output Pipe
and that the flow graph is collapsing, or until the ControlSocket
reads EOF,
indicating that the parent process closed its end of the ControlSocket
and
the flow graph is shutting down.
When a read from PipeMux
returns an EOF or shutdown event, the block breaks
its main run loop, calls cleanup()
, and then closes its output pipes. This
causes the downstream blocks to terminate similarly.
Sources that produce a finite number of samples will exit naturally after they
have produced all of their samples, triggering the collapse of the flow graph.
Sources that run indefinitely, on the other hand, can only be shutdown when the
parent process closes their ControlSocket
to the source.
A block shutdown does not mean samples are lost. Samples are buffered in the
underlying implementation of the Pipe
, even after the producing block has
terminated, and the consuming block will only encounter the nil
on an input
Pipe
after all of these samples have been consumed. This allows samples from
a finite source to be processed through a flow graph to completion.
Code reference: Block class.
The CompositeBlock
is a special block
used to build and run flow graphs. It can either be used as a hierarchical
block: a composition of blocks abstracted into one block with redefined
inputs/outputs at its boundary, or as a top-level block: a composition of
blocks forming a complete flow graph that can be run.
A CompositeBlock
used for a hierarchical block builds an internal flow graph
by connecting block ports with the
connect()
method, just as a
top-level block would. However, unlike top-level blocks, which have no boundary
inputs or outputs, hierarchical blocks also specify a type signature with
add_type_signature()
for their boundary inputs and outputs.
When a CompositeBlock
adds a type signature, instead of building the
InputPort
and OutputPort
input and output ports under the .inputs
and
.outputs
arrays as a normal block would, the CompositeBlock
builds
AliasedInputPort
and AliasedOutputPort
ports, respectively. These are
special input and output ports that alias existing input and output ports
inside a flow graph.
These aliases are established in calls to connect()
, when boundary
input/output ports are connected to the input/output ports of concrete blocks
in the flow graph.
Code reference: CompositeBlock class.
A CompositeBlock
used for a top-level block builds a flow graph by connecting
block ports with the connect()
method. The data structure for the flow graph is a table that maps block
InputPort
instances to OutputPort
instances. In other words, it is a hash
table of input port to output port, for each connection between blocks in the
graph.
When a connection is made with connect()
on a top-level CompositeBlock
, the
InputPort
and OutputPort
ports are looked up by name in the blocks and
added to the connections table as a key-value pair of input port to output
port. A hierarchical block is connected in the same way, except its ports are
of AliasedInputPort
and AliasedOutputPort
types.
Code reference: CompositeBlock class.
After the flow graph is connected under a top-level block, it can be run with
start()
or
run()
. Running a flow graph
requires a few validation and initialization steps before each block can begin
to consume, process, and produce samples.
First, each block referenced in the connections table is checked for unconnected inputs. Hierarchical blocks are checked recursively for unconnected inputs in their nested connections tables. If an input is unconnected, the flow graph cannot be run and an error is raised.
The connections table is then used to build an auxiliary list called the "evaluation order". This is a list of the blocks in the flow graph arranged in a downstream, dependency-free order, starting with source blocks and ending with sink blocks. Each block in this list may depend on the outputs of previous blocks in the list, but does not depend on the outputs of any successive blocks. This order is needed to correctly differentiate the flow graph, because the output types of upstream blocks become the input types to downstream blocks for differentiation.
The evaluation order is visited to differentiate()
each block. Hierarchical
blocks are differentiated themselves, and then their nested flow graph is
differentiated recursively in a similar manner -- by first building an
evaluation order for their connections table, and then differentiating the
internal blocks along it. If a block does not have a matching type signature
for the input types at its ports, an error is raised. By the end of this
process, every block has a type signature selected with well-defined input and
output types.
Next, the top-level block crawls its connections table recursively to absorb
the internal connections of any hierarchical blocks. The result is a new
connections table of all the underlying concrete blocks in the flow graph,
without abstract hierarchical blocks. Each pair in this connections table is
then visited to create Pipe
objects that connect the OutputPort
and
InputPort
of the concrete blocks in the flow graph.
Now that all of the concrete blocks have been connected with Pipe
objects,
each concrete block is checked for matching sample rates across its input
ports. Sample rates are looked up recursively through the Pipe
objects, and
originate with the get_rate()
implementation in source blocks. If the input
port sample rates do not match for a block, an error is raised.
At this stage, all blocks have been differentiated and have a known sample
rate, so initialize()
is called on each block in the flow graph, including
hierarchical blocks, to carry out any block-specific initialization before
running.
Each block is then assigned a ControlSocket
object, which provides an
out-of-band mechanism for the parent process to shutdown blocks, and in the
near future, an asynchronous control interface to each block.
Finally, all of the Pipe
objects that connect the OutputPort
and
InputPort
of the blocks in the flow graph are initialized. Each Pipe
creates an anonymous UNIX socket pair and pre-allocates a read buffer.
The flow graph is now ready to run.
Code reference: CompositeBlock class, Pipe class.
At this stage, the flow graph is fully validated, differentiated, initialized, and is ready to run.
The CompositeBlock
first blocks SIGINT
and SIGCHLD
signals with
sigprocmask()
, so that it can later synchronously detect these signals with
sigpending()
in wait()
. The CompositeBlock
then calls fork()
for each
block.
The child process for each block closes all unneeded file descriptors, and
calls the block's main run()
method. The child process spends the majority of
its processing lifetime in this method. If this method returns naturally due to
a flow graph collapse, the child exits with exit code 0. If it returns because
of a runtime error, the child prints the error and backtrace to standard error,
and exits with exit code 1.
The parent process closes all file descriptors associated with the Pipe
objects it built, so that the blocks are the only owners of these connected
files, and can close them to signal flow graph collapse.
The parent returns back to the top-level script, where it can use status()
to
get the running status of the flow graph, wait()
to wait for the flow graph
to finish, or stop()
to stop the flow graph.
Code reference: CompositeBlock class.
The wait()
method waits for a
SIGINT
or SIGCHLD
signal. If it gets a SIGINT
signal, which indicates a
user requested exit, it calls stop()
to stop the flow graph. If it gets a
SIGCHLD
signal, which indicates a block exited, it waits on each block PID
with waitpid()
until the flow graph has fully collapsed, and then unblocks
the SIGINT
and SIGCHLD
signals.
The stop()
method shuts down all
blocks in the flow graph by closing every block's ControlSocket
, and waits on
each block PID with waitpid()
until the flow graph has fully collapsed, and
then unblocks the SIGINT
and SIGCHLD
signals. If a block process does not
shutdown within a timeout (100ms), it is forcibly killed with a SIGTERM
signal.
The status()
method checks if
any block is still running with kill(<pid>, 0)
. If all blocks have exited,
it waits on each block PID with waitpid()
, and then unblocks the SIGINT
and
SIGCHLD
signals.
Code reference: CompositeBlock class.