Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding missing protobuf dependency #8

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ require:
- play -> console
- com.rabbitmq -> amqp-client 2.5.0
- org.codehaus.jackson -> jackson-mapper-asl 1.5.1
- com.google.protobuf -> protobuf-java 2.4.1


36 changes: 17 additions & 19 deletions src/play/modules/rabbitmq/RabbitMQPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@
*/
package play.modules.rabbitmq;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.StringUtils;

import play.Logger;
import play.Play;
import play.PlayPlugin;
import play.modules.rabbitmq.stats.StatsService;
import play.modules.rabbitmq.util.ExceptionUtil;
import play.modules.rabbitmq.util.MsgMapper;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

// TODO: Auto-generated Javadoc
/**
Expand Down Expand Up @@ -157,7 +155,7 @@ protected Channel createChannel() {
int attempts = 0;
while (true) {
attempts++;
Logger.info("Attempting to connect to queue: attempt " + attempts);
Logger.trace("Attempting to connect to queue: attempt " + attempts);
try {
Connection connection = this.getConnection();
channel = connection.createChannel();
Expand All @@ -180,18 +178,18 @@ protected Channel createChannel() {
* @param queue
* the queue
* @return the channel
* @throws Exception
* @throws IOException
* the exception
*/
public Channel createChannel(String queue, String routingKey) throws Exception {
public Channel createChannel(String queue, String routingKey) throws IOException {
// Counter that keeps track of number of retries
int attempts = 0;

// Get Plugin
RabbitMQPlugin plugin = Play.plugin(RabbitMQPlugin.class);

// Log Debug
Logger.info("Initializing connections to RabbitMQ instance (%s:%s), Queue: %s", RabbitMQPlugin.getHost(), RabbitMQPlugin.getPort(), queue);
Logger.trace("Initializing connections to RabbitMQ instance (%s:%s), Queue: %s", RabbitMQPlugin.getHost(), RabbitMQPlugin.getPort(), queue);

// Create Channel
Channel channel = this.createChannel();
Expand All @@ -208,7 +206,7 @@ public Channel createChannel(String queue, String routingKey) throws Exception {
attempts++;

// Log Debug
Logger.debug("Retry " + attempts);
Logger.trace("Retry " + attempts);

// Get Next Delivery Message
try {
Expand All @@ -217,12 +215,12 @@ public Channel createChannel(String queue, String routingKey) throws Exception {
// String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queueName, exchangeName, routingKey);

channel.exchangeDeclare(queue, plugin.getExchangeType(), true);
channel.queueDeclare(queue, plugin.isDurable(), false, false, null);
channel.exchangeDeclare(queue, RabbitMQPlugin.getExchangeType(), true);
channel.queueDeclare(queue, RabbitMQPlugin.isDurable(), false, false, null);
channel.queueBind(queue, queue, routingKey);

// Log Debug
Logger.info("RabbitMQ Task Channel Available: " + channel);
Logger.trace("RabbitMQ Task Channel Available: " + channel);

// Return Channel
return channel;
Expand Down Expand Up @@ -404,4 +402,4 @@ public static String getVhost() {
public Connection getConnection() throws IOException {
return factory.newConnection();
}
}
}
97 changes: 42 additions & 55 deletions src/play/modules/rabbitmq/consumer/RabbitMQConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
*/
package play.modules.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import play.Logger;
import play.Play;
import play.exceptions.UnexpectedException;
import play.jobs.Job;
import play.modules.rabbitmq.RabbitMQPlugin;
import play.modules.rabbitmq.util.ExceptionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;

// TODO: Auto-generated Javadoc
/**
Expand All @@ -41,6 +43,7 @@ public abstract class RabbitMQConsumer<T> extends Job<T> {
* application.conf (please override if you need a new value)
*/
public static int retries = RabbitMQPlugin.retries();
DeliveryTag unacknowledgedDeliveryTag;

/**
* Consume.
Expand All @@ -56,33 +59,31 @@ public abstract class RabbitMQConsumer<T> extends Job<T> {
* @param plugin
* the plugin
* @return the channel
* @throws Exception
* @throws IOException
* the exception
*/
protected Channel createChannel(RabbitMQPlugin plugin) throws Exception {
protected Channel createChannel(RabbitMQPlugin plugin) throws IOException {
// Get Plugin
Channel channel = plugin.createChannel(this.queue(), this.routingKey());
return channel;
}

/**
* Creates the channel.
* Creates the consumer.
*
* @param channel
* the channel
* @param plugin
* the plugin
* @return the channel
* @throws Exception
* the exception
* @throws IOException
* if calling basicConsume throws
*/
protected QueueingConsumer createConsumer(Channel channel, RabbitMQPlugin plugin) throws Exception {
protected QueueingConsumer createConsumer(Channel channel) throws IOException {
// Get Plugin
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(this.queue(), plugin.isAutoAck(), consumer);
channel.basicConsume(this.queue(), RabbitMQPlugin.isAutoAck(), consumer);

// Log Debug
Logger.info("RabbitMQ Consumer - Channel: %s, Consumer: %s " + channel, consumer);
Logger.trace("RabbitMQ Consumer - Channel: %s, Consumer: %s ", channel, consumer);

// Return Channel
return consumer;
Expand All @@ -95,8 +96,12 @@ protected QueueingConsumer createConsumer(Channel channel, RabbitMQPlugin plugin
*/
@Override
public void doJob() {
this.goGetHerSon();
}
try {
this.goGetHerSon();
} catch (IOException e) {
throw new UnexpectedException(e);
}
}

/**
* Gets the message type.
Expand All @@ -108,37 +113,26 @@ public void doJob() {
/**
* Go get her son.
*/
private void goGetHerSon() {
private void goGetHerSon() throws IOException {
// Get Plugin
RabbitMQPlugin plugin = Play.plugin(RabbitMQPlugin.class);

// Define Channel
Channel channel = null;
QueueingConsumer consumer = null;
Channel channel = this.createChannel(plugin);
QueueingConsumer consumer = this.createConsumer(channel);
Long deliveryTag = null;

// Get Channel
while (true) {
// Log Debug
Logger.info("Entering main loop on consumer: " + this);
Logger.trace("Entering main loop on consumer: " + this);

// Are Consumers Running?
boolean active = RabbitMQPlugin.areConsumersActive();

// Only do work if consumers are running
if (active) {
try {
// Create Channel
if (channel == null || (channel != null && channel.isOpen() == false)) {
consumer = null;
channel = this.createChannel(plugin);
}

// Create Consumer
if (consumer == null) {
consumer = this.createConsumer(channel, plugin);
}

// Get Task
QueueingConsumer.Delivery task = consumer.nextDelivery();

Expand All @@ -149,6 +143,7 @@ private void goGetHerSon() {
// consumer,
// ack the queue and do the retry logic
deliveryTag = task.getEnvelope().getDeliveryTag();
unacknowledgedDeliveryTag = new DeliveryTag(deliveryTag, channel);
T message = this.toObject(task.getBody());
new RabbitMQMessageConsumerJob(channel, deliveryTag, this.queue(), this, message, this.retries()).doJobWithResult();

Expand All @@ -167,29 +162,6 @@ private void goGetHerSon() {
Logger.error(ExceptionUtil.getStackTrace(t));
}

} finally {
if (channel != null) {
// Now tell Daddy everything is cool
try {
if (deliveryTag != null && channel.isOpen()) {
channel.basicAck(deliveryTag, false);
}
} catch (Throwable e) {
Logger.error(ExceptionUtil.getStackTrace("Error doing a basicAck for tag: " + deliveryTag, e));
}
try {
if (channel.getConnection() != null && channel.getConnection().isOpen()) {
channel.getConnection().close();
}
if (channel.isOpen() == true) {
channel.close();
}
} catch (Throwable t) {
Logger.error(ExceptionUtil.getStackTrace(t));
} finally {
channel = null;
}
}
}
} else {
Logger.warn("RabbitMQ consumers are paused and napping for 10 secs...");
Expand All @@ -201,6 +173,14 @@ private void goGetHerSon() {
}
}

protected void ack() {
try {
unacknowledgedDeliveryTag.channel.basicAck(unacknowledgedDeliveryTag.deliveryTag, true);
} catch (IOException e) {
throw new UnexpectedException(e);
}
}

/**
* Gets the queue name.
*
Expand All @@ -211,8 +191,6 @@ private void goGetHerSon() {
/**
* Routing key.
*
* @param t
* the t
* @return the string
*/
protected String routingKey() {
Expand Down Expand Up @@ -241,4 +219,13 @@ protected T toObject(byte[] bytes) throws Exception {
return (T) RabbitMQPlugin.mapper().getObject(this.getMessageType(), bytes);
}

}
public static class DeliveryTag {
public long deliveryTag;
public Channel channel;

public DeliveryTag(long deliveryTag, Channel channel) {
this.deliveryTag = deliveryTag;
this.channel = channel;
}
}
}
47 changes: 18 additions & 29 deletions src/play/modules/rabbitmq/consumer/RabbitMQMessageConsumerJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class RabbitMQMessageConsumerJob<T> extends Job<T> {
private T message;

/** The consumer. */
private RabbitMQConsumer consumer;
private RabbitMQConsumer<T> consumer;

/** The retries. */
private int retries;
Expand All @@ -36,12 +36,20 @@ public class RabbitMQMessageConsumerJob<T> extends Job<T> {
/**
* Instantiates a new rabbit mq message consumer job.
*
* @param channel
* the channel
* @param deliveryTag
* the delivery tag
* @param queue
* the queue
* @param consumer
* the consumer
* @param message
* the message
* @param retries
* number of retries
*/
public RabbitMQMessageConsumerJob(Channel channel, long deliveryTag, String queue, RabbitMQConsumer consumer, T message, int retries) {
public RabbitMQMessageConsumerJob(Channel channel, long deliveryTag, String queue, RabbitMQConsumer<T> consumer, T message, int retries) {
this.consumer = consumer;
this.message = message;
this.retries = retries;
Expand Down Expand Up @@ -84,12 +92,9 @@ public void doJob() {
this.consumer.consume(this.message);
success = true;

// Now tell Daddy everything is cool
this.channel.basicAck(this.deliveryTag, false);

// Execution Time
executionTime = new java.util.Date().getTime() - start;
Logger.info("Message %s from queue %s has been processed by consumer %s (execution time: %s ms)", this.message, this.queue, this.consumer, executionTime);
Logger.debug("Message %s from queue %s has been processed by consumer %s (execution time: %s ms)", this.message, this.queue, this.consumer, executionTime);

// Update Stats
play.modules.rabbitmq.RabbitMQPlugin.statsService().record(this.queue, play.modules.rabbitmq.stats.StatsEvent.Type.CONSUMER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS, executionTime);
Expand Down Expand Up @@ -150,29 +155,13 @@ public void doJob() {
// Log Debug
if (!success) {
Logger.error("Final error processing message (%s) with consumer (%s). Last Exception: %s", this.message, this.consumer, exception);
}

// Now tell Daddy everything is cool
try {
this.channel.basicAck(this.deliveryTag, false);
} catch (Throwable e) {
Logger.error(ExceptionUtil.getStackTrace("Error doing a basicAck for tag: " + this.deliveryTag, e));
}

// Cleanup Channel
if ( channel != null && channel.getConnection() != null && channel.getConnection().isOpen() ) {
try {
channel.getConnection().close();
} catch (Throwable t) {
Logger.error(ExceptionUtil.getStackTrace(t));
}
}
if ( channel != null && channel.isOpen() ) {
try {
channel.close();
} catch (Throwable t) {
Logger.error(ExceptionUtil.getStackTrace(t));
}

// Now tell Daddy everything is cool
try {
this.channel.basicAck(this.deliveryTag, false);
} catch (Throwable e) {
Logger.error(ExceptionUtil.getStackTrace("Error doing a basicAck for tag: " + this.deliveryTag, e));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/play/modules/rabbitmq/producer/RabbitMQPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void doJob() {

// Execution Time
executionTime = new java.util.Date().getTime() - start;
Logger.info("Message %s has been published to queue %s (execution time: %s ms)", this.message, this.queueName, executionTime);
Logger.debug("Message %s has been published to queue %s (execution time: %s ms)", this.message, this.queueName, executionTime);

// Update Stats
play.modules.rabbitmq.RabbitMQPlugin.statsService().record(this.queueName, play.modules.rabbitmq.stats.StatsEvent.Type.PRODUCER, play.modules.rabbitmq.stats.StatsEvent.Status.SUCCESS, executionTime);
Expand Down
Loading