-
Notifications
You must be signed in to change notification settings - Fork 4
Documentation
RoQ enables the concept of elastic logical queue. A logical queue is composed of:
- A set of exchanges
- A monitor
The monitor tracks the state of the exchanges through a set of KPI such as the message throughput per second. As soon as a predefined KPI threshold is reached, the monitor starts the re-allocation process to spawn a new exchange and to re-locate the most active publishers.
RoQ does not create new Exchange process, it needs to rely on a cloud service manager such as RightScale, Scalr or any other IaaS API on which you can spawn VM.
The next picture shows how the exchanges and the monitor are part of the logical queue and how they interact each other.
- When the queue is located, the Client asks the monitor for the exchange to connect to. The monitor checks where is the less overloaded exchange and returns the address to bind to.
- The client binds to the specified exchange and sends the first message
- The Exchange fanouts the message to the listeners
Notice that we can easily implement a key-based message routing at the exchange level. The description of the publisher API can be found in the Client API wiki page.
One of the most important property of RoQ is that it enables to re-allocate most important publishers on new or other exchanges. This operation is completely transparent for the client caller and only involves the Publisher client library.
Here is the process used by the monitor to re-allocate a publisher:
- Get the host index in the exchange host list
- Check the max throughput value
- Check the limit case (1 publisher)
- Get a candidate: exchange that is still under the max throughput limit and that has the lower throughput
- Update the config state of the exchange candidate
- Send the relocate message to the publisher
When a client wants to create a new logical queue, it has to instantiate a logical queue factory as
IRoQLogicalQFactory factory =factory = new LogicalQFactory(configServer);
factory.createQueue("queue1", host);
The factory must start with the address of a global configuration server. Then, we can create a queue with the target host on which we want to make start the logical queue. However, there is a constraint, the host must be registered in the global configuration, this means that a host manager process runs on this host. This is the case in all RoQ node members.
- The Q Factory needs to get the host for the Qname on which the create connection has been called, therefore it asks the global configuration manager to get host by Qname.
- Check whether the host is present.
- Contact the host via the configuration request socket for sending a create queue request.
- The Host manager evaluate the other potential queue running on the host and will attribute the monitor ports and exchange ports in accordance.
- Then the host manger answer with the location of the newly created monitor.
- The Logical Q factory send a registration request for registration the monitor with the queue name at the global config manager.
- The Q Factory needs to get the host for the Qname on which the create connection has been called, therefore it asks the global configuration manager to get host by Qname.
- The factory creates a connection. The "open" call on this connection will start a ConnectionManager thread that will interact with the monitor (by ZMQ) for all connection issues: which exchange to connect, re-allocation process, exchange crashes, etc.
- The connection manager connects the exchange as defined by the monitor in the init phase.
- The factory returns the publisher Interface. This last will use the connection to publish messages.
- As soon as the monitor will identify important producers, it will start the re-allocation process and the Connection correspondent Connection managers will be notified and asked to connect another exchange.
When a Global Configuration Manager starts, it opens a port for a client interface. It also runs a Management Controller thread which opens its own BSON-encoded client interface. In order to manage queues, to start/stop queues the management server maintains a management view of the RoQ cluster thanks to ZooKeeper. As a result, the Global Configuration Manager itself is responsible for maintaining topology information in ZooKeeper, while the Management Controller is responsible for performing the requested admin tasks.
As an example, to create a queue on a given host machine, one must send a BSON_CONFIG_CREATE_QUEUE
message specifying the queue name and the target host address to the Management Controller interface (port 5003 by default). The Management Controller asks the host to start the desired queue and then sends a message to the GCM's own interface (port 5000 by default). Lastly, the GCM stores in ZooKeeper the topology information that corresponds to the newly created queue.
From the tcp://[GCM address]:5005, any client can subscribe and periodically receive a configuration in BSON using the ZMQ subscribe. For further information about ZMQ subscription we advise the reader to have a quick look at http://www.zeromq.org/intro:read-the-manual.
As mentioned, we use the mongoDB java driver for BSON (https://github.com/mongodb/mongo-java-driver). The reader can find example of encode/decode in org.roqmessaging.management.server.UnitTestManagement.java in the Simulation module.
Basically, we send a multi-message envelope through a ZMQ publisher, the message is composed of 3 parts:
- The CMD_ID: 1500 for MNGT_UPDATE_CONFIG as defined in RoQConstants.java
- The Queues: the list of Queue states encoded in BSON
{ "Queues" : [ { "Name" : "queue1" , "Host" : "127.0.1.1" , "State" : false} , { "Name" : "queue2" , "Host" : "127.0.1.2" , "State" : false} , { "Name" : "queue3" , "Host" : "127.0.1.3" , "State" : false}]} - The Hosts: the list of host addresses encoded in BSON
{ "hosts" : [ "127.0.1.1" , "127.0.1.2" , "127.0.1.3" , "127.0.1.4"]}
#Configuration files We can customize RoQ deployments by editing the two main configuration files in the ROQ_HOME/config. Check the page Configuration files for more details.
Next steps implementing the request interface of the management console.
The RoQ code is organised in modules and as the project is still in incubation phase, other modules will arrive. In this section we will describe the main existing modules. The Following picture shows the module dependency and inheritance.
This is the core of the messaging system, it contains the Exchange, the publisher and subscriber client libraries, the monitor and all other timer tasks that are used for measuring the throughput.
This module contains all the necessary main methods to launch a complete simulation environment. The primary objective of the Simulation module is to give an easy way to test and evaluate the RoQ performance.
This module contains the super POM and the common configuration for all modules.
This module will contain all the necessary API that clients, either publisher or subscriber, will need to have. Therefore, a specific JAR file will be created out this module.
This module will contain all the necessary objects to manage instances of RoQ elements. Indeed, we will be able to offer management API to provision and remove exchanges instance or to get information about throughput or active publishers.