Skip to content

Commit 0652458

Browse files
committed
Add option for deleting all scheduled messages on startup
1 parent aa842da commit 0652458

File tree

6 files changed

+114
-1
lines changed

6 files changed

+114
-1
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

+14
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ public class BrokerService implements Service {
184184
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
185185
// to other jms messaging systems
186186
private boolean deleteAllMessagesOnStartup;
187+
private boolean deleteAllScheduledMessagesOnStartup = false;
187188
private boolean advisorySupport = true;
188189
private boolean anonymousProducerAdvisorySupport = false;
189190
private URI vmConnectorURI;
@@ -1630,6 +1631,18 @@ public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStar
16301631
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
16311632
}
16321633

1634+
/**
1635+
* Sets whether all scheduled messages are deleted on startup
1636+
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1637+
*/
1638+
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) {
1639+
this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup;
1640+
}
1641+
1642+
public boolean isDeleteAllScheduledMessagesOnStartup() {
1643+
return deleteAllScheduledMessagesOnStartup;
1644+
}
1645+
16331646
public URI getVmConnectorURI() {
16341647
if (vmConnectorURI == null) {
16351648
try {
@@ -2440,6 +2453,7 @@ protected Broker addInterceptors(Broker broker) throws Exception {
24402453
if (isSchedulerSupport()) {
24412454
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
24422455
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
2456+
sb.setDeleteAllScheduledMessagesOnStartup(deleteAllScheduledMessagesOnStartup);
24432457
if (isUseJmx()) {
24442458
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
24452459
try {

activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java

+17
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
7272
private final JobSchedulerStore store;
7373
private JobScheduler scheduler;
7474
private int maxRepeatAllowed = MAX_REPEAT_ALLOWED;
75+
private boolean deleteAllScheduledMessagesOnStartup;
7576

7677
public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
7778
super(next);
@@ -212,6 +213,9 @@ public synchronized JobScheduler getJobScheduler() throws Exception {
212213
public void start() throws Exception {
213214
this.started.set(true);
214215
getInternalScheduler();
216+
if (deleteAllScheduledMessagesOnStartup) {
217+
deleteAllScheduledMessages();
218+
}
215219
super.start();
216220
}
217221

@@ -364,6 +368,11 @@ private void doSchedule(Message messageSend, Object cronValue, Object periodValu
364368
new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
365369
}
366370

371+
private void deleteAllScheduledMessages() throws Exception {
372+
LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided");
373+
getInternalScheduler().removeAllJobs();
374+
}
375+
367376
@Override
368377
public void scheduledJob(String id, ByteSequence job) {
369378
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
@@ -487,4 +496,12 @@ public int getMaxRepeatAllowed() {
487496
public void setMaxRepeatAllowed(int maxRepeatAllowed) {
488497
this.maxRepeatAllowed = maxRepeatAllowed;
489498
}
499+
500+
public boolean getDeleteAllScheduledMessagesOnStartup() {
501+
return deleteAllScheduledMessagesOnStartup;
502+
}
503+
504+
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) {
505+
this.deleteAllScheduledMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup;
506+
}
490507
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.activemq.broker.scheduler;
19+
20+
import jakarta.jms.Connection;
21+
import jakarta.jms.Message;
22+
import jakarta.jms.MessageConsumer;
23+
import jakarta.jms.MessageListener;
24+
import jakarta.jms.MessageProducer;
25+
import jakarta.jms.Session;
26+
import jakarta.jms.TextMessage;
27+
import org.apache.activemq.ScheduledMessage;
28+
import org.junit.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import static org.junit.Assert.assertEquals;
36+
37+
public class JmsSchedulerDeleteAllMessageOnStartupOptionTest extends JobSchedulerTestSupport {
38+
39+
private static final transient Logger LOG = LoggerFactory.getLogger(JmsSchedulerDeleteAllMessageOnStartupOptionTest.class);
40+
41+
@Override
42+
protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception {
43+
return true;
44+
}
45+
46+
@Test
47+
public void testDeleteAllMessageOnRestart() throws Exception {
48+
// Send a message delayed by 8 seconds
49+
Connection connection = createConnection();
50+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
51+
connection.start();
52+
long time_ms = 10 * 1000;
53+
MessageProducer producer = session.createProducer(destination);
54+
TextMessage message = session.createTextMessage("test msg");
55+
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time_ms);
56+
producer.send(message);
57+
producer.close();
58+
// Shutdown broker
59+
restartBroker(RestartType.NORMAL);
60+
// Make sure the consumer won't get the message
61+
connection = createConnection();
62+
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
63+
MessageConsumer consumer = session.createConsumer(destination);
64+
final int COUNT = 1;
65+
final CountDownLatch latch = new CountDownLatch(COUNT);
66+
consumer.setMessageListener(new MessageListener() {
67+
@Override
68+
public void onMessage(Message message) {
69+
latch.countDown();
70+
}
71+
});
72+
latch.await(20, TimeUnit.SECONDS);
73+
assertEquals(latch.getCount(), COUNT);
74+
connection.close();
75+
}
76+
}

activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java

+5
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ protected BrokerService createBroker(boolean delete) throws Exception {
116116
answer.setSchedulerDirectoryFile(schedulerDirectory);
117117
answer.setSchedulerSupport(true);
118118
answer.setUseJmx(isUseJmx());
119+
answer.setDeleteAllScheduledMessagesOnStartup(shouldDeleteAllScheduledMessagesOnStartup());
119120
return answer;
120121
}
121122

@@ -136,4 +137,8 @@ protected void restartBroker(RestartType restartType) throws Exception {
136137
broker.start();
137138
broker.waitUntilStarted();
138139
}
140+
141+
protected boolean shouldDeleteAllScheduledMessagesOnStartup() throws Exception {
142+
return false;
143+
}
139144
}

activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public void testBrokerConfig() throws Exception {
170170
assertEquals("Broker Config Error (persistent)", false, broker.isPersistent());
171171
assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook());
172172
assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup());
173+
assertEquals("Broker Config Error (deleteAllScheduledMessagesOnStartup)", true, broker.isDeleteAllScheduledMessagesOnStartup());
173174
LOG.info("Success");
174175

175176
// Check specific vm transport

activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
2727
useLoggingForShutdownErrors="true" useJmx="true"
2828
persistent="false" vmConnectorURI="vm://javacoola"
29-
useShutdownHook="false" deleteAllMessagesOnStartup="true">
29+
useShutdownHook="false" deleteAllMessagesOnStartup="true" deleteAllScheduledMessagesOnStartup="true">
3030

3131
<!--
3232
|| NOTE this config file is used for unit testing the configuration mechanism

0 commit comments

Comments
 (0)