Skip to content
This repository has been archived by the owner on Oct 5, 2021. It is now read-only.

Introduce persistence layer for the broker #106

Merged
merged 7 commits into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,13 @@ public void operationComplete(ChannelFuture future) {
}
}
}

@Override
public String toString() {
return "AmqpConsumer{"
+ "queueName='" + queueName + '\''
+ ", consumerTag=" + consumerTag
+ ", isExclusive=" + isExclusive
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void write(ChannelHandlerContext ctx) {
ShortString.parseString(metadata.getRoutingKey()));

HeaderFrame headerFrame = new HeaderFrame(channel.getChannelId(), 60, metadata.getContentLength());
headerFrame.setRawMetadata(metadata.getRawMetadata());
headerFrame.setProperties(metadata.getProperties());
headerFrame.setHeaders(metadata.getHeaders());
ctx.write(basicDeliverFrame);
ctx.write(headerFrame);
for (ContentChunk chunk : message.getContentChunks()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,4 @@ protected void initChannel(SocketChannel socketChannel) {
.addLast(ioExecutors, new BlockingTaskHandler());
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.wso2.broker.amqp.AckData;
import org.wso2.broker.amqp.AmqpConsumer;
import org.wso2.broker.amqp.AmqpDeliverMessage;
import org.wso2.broker.amqp.AmqpException;
import org.wso2.broker.common.data.types.FieldTable;
import org.wso2.broker.common.data.types.ShortString;
import org.wso2.broker.core.Broker;
Expand Down Expand Up @@ -135,25 +134,26 @@ public void close() {
}
}

public void cancelConsumer(ShortString consumerTag) throws AmqpException {
AmqpConsumer amqpConsumer = consumerMap.get(consumerTag);
public void cancelConsumer(ShortString consumerTag) throws ChannelException {
AmqpConsumer amqpConsumer = consumerMap.remove(consumerTag);
if (amqpConsumer != null) {
broker.removeConsumer(amqpConsumer);
} else {
throw new AmqpException("Invalid Consumer tag [ " + consumerTag + " ] for the channel: " + channelId);
throw new ChannelException(ChannelException.NOT_FOUND,
"Invalid Consumer tag [ " + consumerTag + " ] for the channel: " + channelId);
}
}

public InMemoryMessageAggregator getMessageAggregator() {
return messageAggregator;
}

public void acknowledge(long deliveryTag, boolean multiple) {
public void acknowledge(long deliveryTag, boolean multiple) throws BrokerException {
//TODO handle multiple
AckData ackData = unackedMessageMap.remove(deliveryTag);
if (ackData != null) {
ackData.getMessage().release();
broker.acknowledge(ackData.getQueueName(), ackData.getMessage().getMetadata().getInternalId());
broker.acknowledge(ackData.getQueueName(), ackData.getMessage());
} else {
LOGGER.warn("Could not find a matching ack data for acking the delivery tag " + deliveryTag);
}
Expand All @@ -178,7 +178,7 @@ public void recordMessageDelivery(long deliveryTag, AckData ackData) {
unackedMessageMap.put(deliveryTag, ackData);
}

public void reject(long deliveryTag, boolean requeue) {
public void reject(long deliveryTag, boolean requeue) throws BrokerException {
AckData ackData = unackedMessageMap.remove(deliveryTag);
if (ackData != null) {
Message message = ackData.getMessage();
Expand All @@ -203,7 +203,7 @@ public Collection<AckData> recover() {
return unackedMessageMap.clear();
}

public void rejectAll() {
public void rejectAll() throws BrokerException {
Collection<AckData> entries = unackedMessageMap.clear();
for (AckData ackData : entries) {
Message message = ackData.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
* but not other channels in the same connection.
*/
public class ChannelException extends Exception {

public static final int NOT_ALLOWED = 530;

public static final int NOT_FOUND = 404;

private final int replyCode;

public ChannelException(int replyCode, String replyText) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@

import io.netty.buffer.ByteBuf;
import org.wso2.broker.amqp.AmqpException;
import org.wso2.broker.common.data.types.FieldTable;
import org.wso2.broker.common.data.types.ShortString;
import org.wso2.broker.core.Broker;
import org.wso2.broker.core.BrokerException;
import org.wso2.broker.core.ContentChunk;
import org.wso2.broker.core.Message;
import org.wso2.broker.core.Metadata;

import java.util.function.BiFunction;

/**
* Handles incoming AMQP message frames and creates {@link Message}.
*/
Expand Down Expand Up @@ -57,14 +56,15 @@ public void basicPublishReceived(ShortString routingKey, ShortString exchangeNam
/**
* Add the header frame that gives the relevant metadata for the given message.
*
* @param rawMetadata unprocessed raw metadata {@link ByteBuf}
* @param headers protocol specific headers
* @param properties properties of the message
* @param payloadSize total message content length in bytes
*/
public void headerFrameReceived(ByteBuf rawMetadata, long payloadSize,
BiFunction<ByteBuf, Metadata, Boolean> headerParser) {
public void headerFrameReceived(FieldTable headers, FieldTable properties, long payloadSize) {
long messageId = broker.getNextMessageId();
Metadata metadata = new Metadata(messageId, routingKey, exchangeName, payloadSize);
metadata.setRawMetadata(rawMetadata, headerParser);
metadata.setProperties(properties);
metadata.setHeaders(headers);
message = new Message(metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.wso2.broker.amqp.AmqpException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.broker.amqp.codec.AmqpChannel;
import org.wso2.broker.amqp.codec.BlockingTask;
import org.wso2.broker.amqp.codec.ChannelException;
import org.wso2.broker.amqp.codec.handlers.AmqpConnectionHandler;
import org.wso2.broker.common.data.types.ShortString;

Expand All @@ -32,6 +34,12 @@
*/
public class BasicCancel extends MethodFrame {

private static final Logger LOGGER = LoggerFactory.getLogger(BasicCancel.class);

private static final int CLASS_ID = 60;

private static final int METHOD_ID = 30;

private final ShortString consumerTag;

private final boolean noWait;
Expand Down Expand Up @@ -61,8 +69,10 @@ public void handle(ChannelHandlerContext ctx, AmqpConnectionHandler connectionHa
try {
channel.cancelConsumer(consumerTag);
ctx.writeAndFlush(new BasicCancelOk(getChannel(), consumerTag));
} catch (AmqpException e) {
// TODO handle exception: write the error back to client
} catch (ChannelException e) {
LOGGER.error("Error occurred while closing consumer.", e);
ctx.writeAndFlush(new ChannelClose(getChannel(), e.getReplyCode(),
ShortString.parseString(e.getMessage()), CLASS_ID, METHOD_ID));
}
});
}
Expand All @@ -84,7 +94,7 @@ public boolean isNoWait() {
public static AmqMethodBodyFactory getFactory() {
return (buf, channel, size) -> {
ShortString consumerTag = ShortString.parse(buf);
boolean noWait = buf.readBoolean();;
boolean noWait = buf.readBoolean();
return new BasicCancel(channel, consumerTag, noWait);
};
}
Expand Down
Loading