Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial Test: not working.. but just a skeleton of the test to be done #7

Open
wants to merge 2 commits into
base: ARTEMIS-5131
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2744,4 +2744,12 @@ static void getAuthorizationFailureCount(Object source) {

@LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthorizationFailureCount(String user, Object source);


static void copyMessage(Object source, Object... args) {
BASE_LOGGER.copyMessage(getCaller(), source, parametersList(args));
}

@LogMessage(id = 601782, value = "User {} is copying a message to another queue on target resource: {} {}", level = LogMessage.Level.INFO)
void copyMessage(String user, Object source, String args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transact
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
@Parameter(name = "messageCount", desc = "Number of messages to move.") int messageCount) throws Exception;

@Operation(desc = "Send a copy of the message with given messageID to another queue)", impact = MBeanOperationInfo.ACTION)
boolean copyMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
@Parameter(name = "targetQueue", desc = "The name of the queue to copy the messages to") String targetQueue) throws Exception;

/**
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,32 @@ public boolean moveMessage(final long messageID,

}

@Override
public boolean copyMessage(final long messageID,
final String targetQueue) throws Exception {
// this is a critical task, we need to prevent parallel tasks running
try (AutoCloseable lock = server.managementLock()) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.copyMessage(queue, messageID, targetQueue);
}
checkStarted();

clearIO();
try {
Binding binding = server.getPostOffice().getBinding(SimpleString.of(targetQueue));

if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(targetQueue);
}

return queue.copyReference(messageID, binding.getAddress(), binding);
} finally {
blockOnIO();
}
}

}

@Override
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception {
return moveMessages(filterStr, otherQueueName, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ int moveReferences(int flushLimit,
int messageCount,
Binding binding) throws Exception;


boolean copyReference(long messageID, SimpleString queue, Binding binding) throws Exception;

int retryMessages(Filter filter) throws Exception;

default int retryMessages(Filter filter, Integer expectedHits) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,26 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
});
}

@Override
public synchronized boolean copyReference(final long messageID,
final SimpleString toQueue,
final Binding binding) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
try {
copy(null, toQueue, binding, ref);
} catch (Exception e) {
throw e;
}
return true;
}
}
return false;
}
}

public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
Expand Down Expand Up @@ -3679,6 +3699,38 @@ private RoutingStatus move(final Transaction originalTX,
return routingStatus;
}

private RoutingStatus copy(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref) throws Exception {
Transaction tx;

if (originalTX != null) {
tx = originalTX;
} else {
// if no TX we create a new one to commit at the end
tx = new TransactionImpl(storageManager);
}

Message copyMessage = makeCopy(ref, false, false, address);

Object originalRoutingType = ref.getMessage().getBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE);
if (originalRoutingType != null && originalRoutingType instanceof Byte) {
copyMessage.setRoutingType(RoutingType.getType((Byte) originalRoutingType));
}

RoutingStatus routingStatus;
{
RoutingContext context = new RoutingContextImpl(tx);
routingStatus = postOffice.route(copyMessage, context, false, false, binding);
}

if (originalTX == null) {
tx.commit();
}
return routingStatus;
}

@SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
final Transaction tx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,11 @@ public int moveReferences(int flushLimit,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public int retryMessages(Filter filter) throws Exception {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,11 @@ public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public void addRedistributor(long delay) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,11 @@ public int moveReferences(int flushLimit, Filter filter, SimpleString toAddress,
return 0;
}

@Override
public boolean copyReference(long messageID, SimpleString address, Binding binding) throws Exception {
return false;
}

@Override
public void forceDelivery() {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,60 @@ public void testMoveMessage() throws Exception {
session.deleteQueue(otherQueue);
}

@TestTemplate
public void testCopyMessage() throws Exception {
SimpleString address = SimpleString.of("address");//RandomUtil.randomSimpleString();
SimpleString queue = SimpleString.of("queue");//RandomUtil.randomSimpleString();
SimpleString otherAddress = SimpleString.of("otherAddress");//RandomUtil.randomSimpleString();
SimpleString otherQueue = SimpleString.of("otherQueue");//RandomUtil.randomSimpleString();
SimpleString otherQueue2 = SimpleString.of("otherQueue2");//RandomUtil.randomSimpleString();


session.createQueue(QueueConfiguration.of(queue).setAddress(address).setDurable(durable));
session.createQueue(QueueConfiguration.of(otherQueue).setAddress(otherAddress).setDurable(durable));
session.createQueue(QueueConfiguration.of(otherQueue2).setAddress(otherAddress).setDurable(durable));
ClientProducer producer = session.createProducer(address);

// send 2 messages on queue
producer.send(session.createMessage(durable));
producer.send(session.createMessage(durable));

QueueControl queueControl = createManagementControl(address, queue);
QueueControl otherQueueControl = createManagementControl(otherAddress, otherQueue);
QueueControl otherQueueControl2 = createManagementControl(otherAddress, otherQueue2);
assertMessageMetrics(queueControl, 2, durable);
assertMessageMetrics(otherQueueControl, 0, durable);
assertMessageMetrics(otherQueueControl2, 0, durable);

// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long) messages[0].get("messageID");

boolean copied = queueControl.copyMessage(messageID, otherQueue.toString());
assertTrue(copied);

assertMessageMetrics(queueControl, 2, durable);
assertMessageMetrics(otherQueueControl, 1, durable);
assertMessageMetrics(otherQueueControl2, 0, durable);

messageID = (Long) messages[1].get("messageID");
copied = queueControl.copyMessage(messageID, otherQueue.toString());
assertTrue(copied);

assertMessageMetrics(queueControl, 2, durable);
assertMessageMetrics(otherQueueControl, 2, durable);
assertMessageMetrics(otherQueueControl2, 0, durable);

consumeMessages(2, session, queue);
consumeMessages(2, session, otherQueue);
consumeMessages(0, session, otherQueue2);

session.deleteQueue(queue);
session.deleteQueue(otherQueue);
session.deleteQueue(otherQueue2);
}

/**
* Moving message from another address to a single "child" queue of a multicast address
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ public int moveMessages(int flushLimit, String filter, String otherQueueName, bo
return (Integer) proxy.invokeOperation(Integer.class, "moveMessages", flushLimit, filter, otherQueueName, rejectDuplicates, messageCount);
}

@Override
public boolean copyMessage(long messageID, String targetQueue) throws Exception {
return (Boolean) proxy.invokeOperation("copyMessage", messageID, targetQueue);
}

@Override
public int moveMessages(final String filter,
final String otherQueueName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CopyMessageTest extends ActiveMQTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private ActiveMQServer server;

@Override
@BeforeEach
public void setUp() throws Exception {
super.setUp();

server = addServer(ActiveMQServers.newActiveMQServer(createDefaultNettyConfig(), true));
// start the server
server.start();
}


@Test
public void testCopyMessageAMQP() throws Exception {

}

private void testCopyMessage(String protocol, boolean isLarge) throws Exception {

String originalAddress = "CopyMessageTest_origin";
String targetAddress = "CopyMessageTest_target";

Queue originalServerQueue = server.createQueue(QueueConfiguration.of(originalAddress).setRoutingType(RoutingType.ANYCAST));
Queue targetServerQueue = server.createQueue(QueueConfiguration.of(targetAddress).setRoutingType(RoutingType.ANYCAST));


int nMessages = 10;

String dummyLoad;
{
StringBuilder builder = new StringBuilder();

if (isLarge) {
while (builder.length() < 110 + 1024) {
builder.append("This is large ");
}
} else {
builder.append("this is small");
}

dummyLoad = builder.toString();
}

ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");

try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(originalAddress));

for (int i = 0; i < nMessages; i++) {
TextMessage message = session.createTextMessage(getText(i, dummyLoad));
message.setIntProperty("i", i);
producer.send(message);
}
session.commit();

Wait.assertEquals(nMessages, originalServerQueue::getMessageCount);

ArrayList<Long> messagesToCopy = new ArrayList<>();

originalServerQueue.forEach(r -> messagesToCopy.add(r.getMessageID()));

messagesToCopy.forEach(l -> originalServerQueue.);

}



}

private String getText(int i, String dummyLoad) {
return "message " + i + dummyLoad;
}

}