Skip to content

Commit

Permalink
ARTEMIS-5131 Add A Copy message button to console
Browse files Browse the repository at this point in the history
This exposes a copyMessage method that simply copies a message to a different queue so the new console can add a copy button
  • Loading branch information
andytaylor committed Nov 14, 2024
1 parent bc077f4 commit 1b8a734
Show file tree
Hide file tree
Showing 11 changed files with 541 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2744,4 +2744,12 @@ static void getAuthorizationFailureCount(Object source) {

@LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthorizationFailureCount(String user, Object source);


static void copyMessage(Object source, Object... args) {
BASE_LOGGER.copyMessage(getCaller(), source, parametersList(args));
}

@LogMessage(id = 601782, value = "User {} is copying a message to another queue on target resource: {} {}", level = LogMessage.Level.INFO)
void copyMessage(String user, Object source, String args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transact
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
@Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception;

@Operation(desc = "Send a copy of the message with given messageID to another queue)", impact = MBeanOperationInfo.ACTION)
boolean copyMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
@Parameter(name = "targetQueue", desc = "The name of the queue to copy the messages to") String targetQueue) throws Exception;

/**
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,32 @@ public boolean moveMessage(final long messageID,

}

@Override
public boolean copyMessage(final long messageID,
final String targetQueue) throws Exception {
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.copyMessage(queue, messageID, targetQueue);
}
checkStarted();

clearIO();
try {
Binding binding = server.getPostOffice().getBinding(SimpleString.of(targetQueue));

if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(targetQueue);
}

return queue.copyReference(messageID, binding.getAddress(), binding);
} finally {
blockOnIO();
}
}

}

@Override
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception {
return moveMessages(filterStr, otherQueueName, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ int moveReferences(int flushLimit,
int messageCount,
Binding binding) throws Exception;


boolean copyReference(long messageID, SimpleString queue, Binding binding) throws Exception;

int retryMessages(Filter filter) throws Exception;

default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2790,6 +2790,26 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
});
}

@Override
public synchronized boolean copyReference(final long messageID,
final SimpleString toQueue,
final Binding binding) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
try {
copy(null, toQueue, binding, ref);
} catch (Exception e) {
throw e;
}
return true;
}
}
return false;
}
}

public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
Expand Down Expand Up @@ -3678,6 +3698,38 @@ private RoutingStatus move(final Transaction originalTX,
return routingStatus;
}

private RoutingStatus copy(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref) throws Exception {
Transaction tx;

if (originalTX != null) {
tx = originalTX;
} else {
// if no TX we create a new one to commit at the end
tx = new TransactionImpl(storageManager);
}

Message copyMessage = makeCopy(ref, false, false, address);

Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
}

RoutingStatus routingStatus;
{
RoutingContext context = new RoutingContextImpl(tx);
routingStatus = postOffice.route(copyMessage, context, false, false, binding);
}

if (originalTX == null) {
tx.commit();
}
return routingStatus;
}

@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
final Transaction tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,11 @@ public int moveReferences(int flushLimit,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public int retryMessages(Filter filter) throws Exception {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,11 @@ public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public void addRedistributor(long delay) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,11 @@ public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public void forceDelivery() {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
package org.apache.activemq.artemis.tests.integration.management;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonNumber;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.json.JsonValue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.JsonUtil;
Expand Down Expand Up @@ -200,6 +205,125 @@ public void testListMessagesAsJSONWhilePagingOnGoing() throws Exception {
assertNull(console.getError());
}

@Test
public void testCopyMessageWhilstPaging() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();

SimpleString otherAddress = RandomUtil.randomSimpleString();
SimpleString otherQueue = RandomUtil.randomSimpleString();

session1.createQueue(QueueConfiguration.of(queue).setAddress(address));
session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress));

QueueControl queueControl = createManagementControl(address, queue);

QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);

int num = 100;

ClientProducer producer = session1.createProducer(address);
for (int i = 0; i < num; i++) {
ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i);
producer.send(message);
}

Map<String, Object>[] messages = queueControl.listMessages(null);

long messageID = (Long) messages[99].get("messageID");

assertFalse(queueControl.copyMessage(messageID, otherQueue.toString()));

messageID = (Long) messages[0].get("messageID");

assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));

Map<String, Object>[] copiedMessages = otherQueueControl.listMessages(null);

assertEquals(1, copiedMessages.length);
}

@Test
public void testCopyMessageWhilstPagingSameAddress() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();

SimpleString otherQueue = RandomUtil.randomSimpleString();

session1.createQueue(QueueConfiguration.of(queue).setAddress(address).setRoutingType(RoutingType.ANYCAST));
session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(address).setRoutingType(RoutingType.ANYCAST));

QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST);

QueueControl otherQueueControl = createManagementControl(address, otherQueue, RoutingType.ANYCAST);

int num = 200;

ClientProducer producer = session1.createProducer(address);
for (int i = 0; i < num; i++) {
ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i);
producer.send(message);
}

Map<String, Object>[] messages = queueControl.listMessages(null);

assertEquals(100, messages.length);

Map<String, Object>[] otherMessages = otherQueueControl.listMessages(null);

assertEquals(100, otherMessages.length);

long messageID = (Long) messages[0].get("messageID");

assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));

otherMessages = otherQueueControl.listMessages(null);

assertEquals(101, otherMessages.length);

messageID = (Long) otherMessages[100].get("messageID");

//this should fail as the message was paged successfully
assertFalse(otherQueueControl.copyMessage(messageID, queue.toString()));
}

@Test
public void testMoveMessageWhilstPagingAndConsuming() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();

SimpleString otherAddress = RandomUtil.randomSimpleString();
SimpleString otherQueue = RandomUtil.randomSimpleString();

session1.createQueue(QueueConfiguration.of(queue).setAddress(address));
session1.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress));

QueueControl queueControl = createManagementControl(address, queue);

QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);

int num = 1000;

ClientProducer producer = session1.createProducer(address);
for (int i = 0; i < num; i++) {
ClientMessage message = session1.createMessage(true).writeBodyBufferString("Message" + i);
producer.send(message);
}

ManagementCopyThread console = new ManagementCopyThread(queueControl, otherQueue.toString());
ReceiverThread receiver = new ReceiverThread(queue, num, 0);
console.start();
receiver.start();

receiver.join();
console.stop = true;
console.join();

Map<String, Object>[] messages = otherQueueControl.listMessages(null);

assertEquals(messages.length, console.copiedMessages);
}

@Override
@BeforeEach
public void setUp() throws Exception {
Expand Down Expand Up @@ -345,4 +469,44 @@ public void exit() {
stop = true;
}
}

private class ManagementCopyThread extends Thread {

private QueueControl queueControl;
private String queue;
private volatile boolean stop = false;

int copiedMessages = 0;
private Exception error = null;

private ManagementCopyThread(QueueControl queueControl, String queue) {
this.queueControl = queueControl;
this.queue = queue;
}

@Override
public void run() {
try {
Random random = new Random(System.currentTimeMillis());
while (!stop) {
long messageID = random.nextInt(1000);
boolean copied = queueControl.copyMessage(messageID, queue);
System.out.println("messageID = " + messageID);
if (copied) {
copiedMessages++;
}
}
} catch (Exception e) {
error = e;
}
}

public Exception getError() {
return error;
}

public void exit() {
stop = true;
}
}
}
Loading

0 comments on commit 1b8a734

Please sign in to comment.