Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slaves: failure detection + recovery process #169

Open
wants to merge 24 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fb1aedd
HCM code refactoring
vanmellebenjamin Mar 26, 2015
1c0756f
Process monitoring system (inspired by Storm) + HCM refact + all new …
vanmellebenjamin Mar 26, 2015
2f1e206
Hb mecanism for local process + recover process for Monitor
vanmellebenjamin Mar 27, 2015
48c4cdf
hb monitoring on all the processes
vanmellebenjamin Mar 30, 2015
98aa841
All the processes are monitored and recovered by the HCM ! Tests impl…
vanmellebenjamin Mar 31, 2015
c68c04f
Refactoring + tests
vanmellebenjamin Mar 31, 2015
ff61b6b
small bug corrections etc..
vanmellebenjamin Apr 1, 2015
864274b
Service discovery via zookeeper to montitor the HCMs from the GCM + d…
vanmellebenjamin Apr 8, 2015
59998bd
implementation of the backup monitors (not already tested) + lot of s…
vanmellebenjamin Apr 9, 2015
3827a9c
detect node failure, remove queue and standby monitors, stop process …
vanmellebenjamin Apr 10, 2015
23178ee
removed log files
vanmellebenjamin Apr 10, 2015
60d9fd5
refact
vanmellebenjamin Apr 10, 2015
e6fe66f
wip: failover from active to stdby monitor + Replication factor maint…
vanmellebenjamin Apr 11, 2015
2087057
failover on standby monitor OK
vanmellebenjamin Apr 12, 2015
5cc2b4b
comments + replication factor maintained and tested + WIP: PUB/SUB co…
vanmellebenjamin Apr 12, 2015
523e072
monitor recovery process, not tested
vanmellebenjamin Apr 13, 2015
7e0b72b
advanced test for monitor failover + corrections
vanmellebenjamin Apr 13, 2015
a7fefd6
scaling factor in pub/sub can vary
vanmellebenjamin Apr 13, 2015
5af0d30
corrections + test to check if the pub/sub reconnect automatically on…
vanmellebenjamin Apr 13, 2015
ca2199e
added files to configure zookeeper and its config file inside the con…
vanmellebenjamin Apr 16, 2015
0318e72
fix for the pull requests
vanmellebenjamin Jun 18, 2015
03cf8ce
deployment scrips update
vanmellebenjamin Aug 10, 2015
57fa9e2
alex modifs for deployment scripts
vanmellebenjamin Aug 10, 2015
a14bb10
update the comments in accord with the comments from the pull request…
vanmellebenjamin Aug 13, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ You want to just get a try of ROQ ? Go to the 'demonstration' section, you will
Do you want to contribute to ROQ ? Let's go to the 'local deployment' section in order to deploy ROQ on you machine in order to debug etc.
Get reday for the production ? Go to the 'production' section, we provide you an automatic deployment script which allows to deploy ROQ on Amazon (we plan to support other environments, stay tunned).

/!\ Some scripts recquire advanced rights, if you can, run these scripts in sudo.

Demonstration
-------------

Expand All @@ -30,7 +32,7 @@ This procedure allows you to run ROQ on your local machine. All the ROQ componen

Clone this git repository on your machine. And run the following bash script:
```
RoQ/roq-deployment/demo-start-subscriber.sh
RoQ/roq-deployment/demonstration/demo-start-subscriber.sh
```
Once that the terminal shows that the subscriber is connected, open a second terminal and runs this script:
```
Expand Down Expand Up @@ -105,7 +107,7 @@ Ready for the deployment of your application in the cloud ? We provide an amazon
### Prerequisite (get these packages via yum or apt-get):
- ansible (tested with version 1.8.4)

### Configuration step
### Configuration steps

First, you must set environment variables to allow ansible to communicate with your Amazon account:
Run the following commands in the shell:
Expand All @@ -129,10 +131,10 @@ Don't forget to set the key_path var to the value of your amazon ssh pem key (th
You are ready to run your first ROQ cluster on amazon !
Run the following script:
```
ansible-playbook "PATH TO ROQ"/roq-deployment/ --skip-tags "demonstration"
./"PATH TO ROQ"/roq-deployment/amazon/startCluster.sh
```

Note: If you run several times this script, the instances number stay fixed to the values that you set in the config file.
Note: If you run several times this idempotent script, the instances number stay fixed to the values that you set in the config file.

Your cluster is ready !

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ public interface IRoQQueueManagement {
*/
public boolean createQueue(String queueName) throws IllegalStateException, ConnectException;

/**
* Creates a logical queue automatically.
* @param queueName the queue name
* @param the address on which master monitor is installed
* @throws IllegalStateException if a queue already exist with this name, the name must be unique for
* the complete cluster.
*/
public boolean createQueue(String queueName, String targetHost) throws IllegalStateException, ConnectException;

/**
* Removes a logical queue.
* @param queueName the name of the queue to remove.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.roqmessaging.clientlib.factory;

import java.util.ArrayList;

/**
* Interface IRoQLogicalQueueFactory
* <p> Description: represents the Global monitor manager. As those manager are intended to be stateless, a
Expand All @@ -31,7 +33,7 @@ public interface IRoQLogicalQueueFactory {
* @throws IllegalStateException if a queue already exist with this name, the name must be unique for
* the complete cluster.
*/
public int createQueue(String queueName, String targetAddress, boolean recoveryMod) throws IllegalStateException;
public int createQueue(String queueName, String targetAddress, ArrayList<String> backupHosts, boolean recoveryMod) throws IllegalStateException;

/**
* Removes a logical queue.
Expand Down Expand Up @@ -78,4 +80,22 @@ public interface IRoQLogicalQueueFactory {
*/
public void clean();

/**
* This method create a backup monitor for a given queue, and regiter
* meta data in GCM
* on a given host
* @param queueName
* @param queueBackupMonitor
* @param the hcm lost, null if it is just a backup monitor creation
*/
public boolean createBackupMonitor(String queue, String queueBackupMonitor, String hcmLost);

/**
* This method active a backup monitor on a given host & update the meta data
* in the GCM
* @param queueMonitor
* @param active
*/
public boolean failoverOnBackupMonitor(String queue, String hcmAddress);

}
202 changes: 137 additions & 65 deletions roq-core/src/main/java/org/roqmessaging/core/Exchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.roqmessaging.core;

import java.io.IOException;
import java.util.HashMap;
import java.util.Timer;

Expand All @@ -24,7 +25,10 @@
import org.roqmessaging.core.interfaces.IStoppable;
import org.roqmessaging.core.timer.ExchangeStatTimer;
import org.roqmessaging.core.timer.Heartbeat;
import org.roqmessaging.core.utils.RoQUtils;
import org.roqmessaging.state.ProducerState;
import org.roqmessaging.utils.LocalState;
import org.roqmessaging.utils.Time;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

Expand All @@ -46,6 +50,7 @@ public class Exchange implements Runnable, IStoppable {
private ZMQ.Socket backendPub;
private ZMQ.Socket monitorPub;
private ZMQ.Socket pubInfoRep;
private ZMQ.Socket monitorInfoRep;
private String s_frontend;
private String s_backend;
private String s_monitor;
Expand All @@ -55,6 +60,10 @@ public class Exchange implements Runnable, IStoppable {
private Timer timer = null;
private volatile boolean active=false;
private String ID = null;
//Local State for heartbeats
private LocalState localState;
// Minimum time between two heartbeats (in millis)
private long hbPeriod;

//Shutdown thread
private ShutDownMonitor shutDownMonitor = null;
Expand All @@ -63,61 +72,73 @@ public class Exchange implements Runnable, IStoppable {
private long timeout=80;

/**
* Notice that we start a shutdown request socket on frontEnd port +1
* Notice that we start a shutdown request socket on frontEnd p)ort +1
* @param frontend the front port
* @param backend the back port
* @param monitorHost the address of the monitor to bind tcp:// monitor:monitorPort;
* @param statHost tcp://monitor:statport
* @param localStatePath the folder path in which the processes states will be stored (heartbeats)
* @param hbPeriod the number of seconds between each heatbeat
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please update this comment?

public Exchange(int frontend, int backend, String monitorHost, String statHost) {
knownProd = new HashMap<String, ProducerState>();
this.statistic = new StatDataState();
this.statistic.setProcessed(0);
this.statistic.setThroughput(0);
this.statistic.setStatHost(statHost);
this.statistic.setMax_bw( 5000); // bandwidth limit, in bytes/minute, per producer
this.s_frontend = "tcp://*:" + frontend;
this.s_backend = "tcp://*:" + backend;
this.s_monitor = monitorHost;

this.context = ZMQ.context(1);
this.frontendSub = context.socket(ZMQ.SUB);
this.backendPub = context.socket(ZMQ.PUB);

this.ID = "XChange "+System.currentTimeMillis();

// Caution, the following method as well as setSwap must be invoked before binding
// Use these to (double) check if the settings were correctly set
// logger.info(this.backend.getHWM());
// logger.info(this.backend.getSwap());
setSocketOptions(this.backendPub);
setSocketOptions(this.frontendSub);

this.frontendSub.bind(s_frontend);
this.frontendSub.subscribe("".getBytes());

this.backendPub.bind(s_backend);
this.monitorPub = context.socket(ZMQ.PUB);

//The channel on which the publisher will notifies their deconnection
this.pubInfoRep = context.socket(ZMQ.REP);
this.pubInfoRep.bind("tcp://*:" +(backend+2));

this.monitorPub.connect(s_monitor);
this.frontEnd=frontend;
this.backEnd= backend;
this.active = true;

if(logger.isInfoEnabled()){
logger.info("BackendSub: SndHWM="+this.backendPub.getSndHWM()+" RcvHWM="+this.backendPub.getRcvHWM());
logger.info("FrontendSub: SndHWM="+this.frontendSub.getSndHWM()+" RcvHWM="+this.frontendSub.getRcvHWM());
}
public Exchange(int frontend, int backend, String monitorHost, String statHost, String localStatePath, long hbPeriod) {
try {
knownProd = new HashMap<String, ProducerState>();
this.statistic = new StatDataState();
this.statistic.setProcessed(0);
this.statistic.setThroughput(0);
this.statistic.setStatHost(statHost);
this.statistic.setMax_bw( 5000); // bandwidth limit, in bytes/minute, per producer
this.s_frontend = "tcp://*:" + frontend;
this.s_backend = "tcp://*:" + backend;
this.s_monitor = monitorHost;
localState = new LocalState(localStatePath + "/" + frontend);
this.hbPeriod = hbPeriod;
this.context = ZMQ.context(1);
this.frontendSub = context.socket(ZMQ.SUB);
this.backendPub = context.socket(ZMQ.PUB);


//initiatlisation of the shutdown thread
this.shutDownMonitor = new ShutDownMonitor(backend+1, this);
new Thread(shutDownMonitor).start();
logger.debug("Started shutdown monitor on "+ (backend+1));
this.ID = "XChange "+System.currentTimeMillis();

// Caution, the following method as well as setSwap must be invoked before binding
// Use these to (double) check if the settings were correctly set
// logger.info(this.backend.getHWM());
// logger.info(this.backend.getSwap());
setSocketOptions(this.backendPub);
setSocketOptions(this.frontendSub);

this.frontendSub.bind(s_frontend);
this.frontendSub.subscribe("".getBytes());

this.backendPub.bind(s_backend);
this.monitorPub = context.socket(ZMQ.PUB);

//The channel on which the publisher will notifies their deconnection
this.pubInfoRep = context.socket(ZMQ.REP);
this.pubInfoRep.bind("tcp://*:" +(backend+2));

//The channel on which the monitor will notifies active change
this.monitorInfoRep = context.socket(ZMQ.REP);
this.monitorInfoRep.bind("tcp://*:" +(backend+3));

this.monitorPub.connect(s_monitor);
this.frontEnd=frontend;
this.backEnd= backend;
this.active = true;

if(logger.isInfoEnabled()){
logger.info("BackendSub: SndHWM="+this.backendPub.getSndHWM()+" RcvHWM="+this.backendPub.getRcvHWM());
logger.info("FrontendSub: SndHWM="+this.frontendSub.getSndHWM()+" RcvHWM="+this.frontendSub.getRcvHWM());
}


//initiatlisation of the shutdown thread
this.shutDownMonitor = new ShutDownMonitor(backend+1, this);
new Thread(shutDownMonitor).start();
logger.debug("Started shutdown monitor on "+ (backend+1));
} catch (Exception e) {
logger.error("Error while creating Monitor, ABORDED", e);
return;
}
}

private void setSocketOptions(Socket sock) {
Expand Down Expand Up @@ -173,12 +194,26 @@ public void run() {
//This is important that the exchange stat timer is triggered every second, since it computes throughput in byte/min.
timer.schedule(exchStatTimer, 100, 60000);
int part;
long current;
long lastHb = Time.currentTimeMillis() - hbPeriod;
String prodID= null;
//Adding the poller
ZMQ.Poller poller = new ZMQ.Poller(2);
poller.register(this.frontendSub);
poller.register(this.pubInfoRep);
poller.register(this.monitorInfoRep);
while (this.active) {
// Write Heartbeat
if ((Time.currentTimeMillis() - lastHb) >= hbPeriod) {
try {
current = Time.currentTimeSecs();
logger.info("Exch Writing hb " + frontEnd + " " + current);
localState.put("HB", current);
lastHb = Time.currentTimeMillis();
} catch (IOException e) {
logger.info("Failed to write in local db: " + e);
}
}
byte[] message;
part = 0;
//Set the poll time out, it returns either when someting arrive or when it time out
Expand All @@ -189,34 +224,70 @@ public void run() {
* ** Message multi part construction ** 1: routing key 2:
* producer ID 3: payload
*/

message = frontendSub.recv(0);
part++;
if (part == 2) {
prodID=bytesToStringUTFCustom(message);
prodID = bytesToStringUTFCustom(message);
}
if (part == 3) {
logPayload(message.length, prodID);
}
backendPub.send(message, frontendSub.hasReceiveMore() ? ZMQ.SNDMORE : 0);
} while (this.frontendSub.hasReceiveMore() && this.active);
this.statistic.processed++;
}else{
if(poller.pollin(1)){
//A publisher sends a deconnexion event
byte[] info = pubInfoRep.recv(0);
String mInfo = new String(info);
String[] arrayInfo = mInfo.split(","); //CODE, ID
if(knownProd.remove(arrayInfo[1])!=null){
logger.info("Successfully removed publisher "+arrayInfo[1] +" remains "+ knownProd.size() + " publishers.");
this.pubInfoRep.send(Integer.toString(RoQConstant.OK).getBytes(), 0);
}else{
logger.warn("The publisher "+ arrayInfo[1]+" is not known");
this.pubInfoRep.send(Integer.toString(RoQConstant.FAIL).getBytes(), 0);
}
} else if(poller.pollin(1)) {
//A publisher sends a deconnexion event
byte[] info = pubInfoRep.recv(0);
String mInfo = new String(info);
String[] arrayInfo = mInfo.split(","); //CODE, ID
logger.info("Unregistering: " + arrayInfo[1]);
if(knownProd.remove(arrayInfo[1])!=null){
logger.info("Successfully removed publisher "+arrayInfo[1] +" remains "+ knownProd.size() + " publishers.");
this.pubInfoRep.send(Integer.toString(RoQConstant.OK).getBytes(), 0);
}else{
logger.warn("The publisher "+ arrayInfo[1]+" is not known");
this.pubInfoRep.send(Integer.toString(RoQConstant.FAIL).getBytes(), 0);
}
} else if (poller.pollin(2)) { // Received a message from the monitor
byte[] info = monitorInfoRep.recv(0);
String mInfo = new String(info);
String[] arrayInfo = mInfo.split(","); //CODE, MONITOR_ADDR, STAT_ADDR
if (new Integer(arrayInfo[0]) == RoQConstant.EVENT_MONITOR_CHANGED) {
logger.info("Receiveing monitor changement event: " + arrayInfo[0] +
" mon: " + arrayInfo[1] + " stat: " + arrayInfo[2]);
// Stop the current timers, they use old addresses
exchStatTimer.shutDown();
heartBeatTimer.shutDown();
timer.purge();
timer.cancel();

// set the new addresses
this.statistic.setStatHost(arrayInfo[2]);
this.s_monitor = arrayInfo[1];

// restart the timer with the new parameters
timer = new Timer();
heartBeatTimer = new Heartbeat(this.s_monitor, this.frontEnd, this.backEnd );
timer.schedule(heartBeatTimer, 5, 2000);
exchStatTimer = new ExchangeStatTimer(this, this.statistic);
timer.schedule(exchStatTimer, 100, 60000);

monitorInfoRep.send((
RoQUtils.getInstance().getLocalIP() + "," +
this.frontEnd + "," +
this.backEnd
).getBytes(), 0);
logger.info("the exchange has failover on the new active monitior");
}
}
}
try {
// 0 indicates that the process has been shutdown by the user & have not timed out
localState.put("HB", new Long(0));
} catch (IOException e) {
logger.error("Failed to stop properly the process, it will be restarted...");
e.printStackTrace();
}
closeSockets();
exchStatTimer.shutDown();
heartBeatTimer.shutDown();
Expand Down Expand Up @@ -249,7 +320,8 @@ private void closeSockets() {
frontendSub.close();
backendPub.close();
monitorPub.close();

monitorInfoRep.close();
pubInfoRep.close();
}

/**
Expand Down
Loading