Skip to content

Commit

Permalink
[ISSUE apache#8348] Allow custom fast-failure queues to be added in B…
Browse files Browse the repository at this point in the history
…rokerFastFailure (apache#8347)
  • Loading branch information
RongtongJin authored Jul 4, 2024
1 parent 92c9223 commit 9173def
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2519,4 +2519,6 @@ public ColdDataCgCtrService getColdDataCgCtrService() {
public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) {
this.coldDataCgCtrService = coldDataCgCtrService;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.apache.rocketmq.broker.latency;

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -42,13 +46,26 @@ public class BrokerFastFailure {

private volatile long jstackTime = System.currentTimeMillis();

private final List<Pair<BlockingQueue<Runnable>, Supplier<Long>>> cleanExpiredRequestQueueList = new ArrayList<>();

public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
initCleanExpiredRequestQueueList();
this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
brokerController == null ? null : brokerController.getBrokerConfig()));
}

private void initCleanExpiredRequestQueueList() {
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getSendThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getPullThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getLitePullThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getHeartbeatThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getEndTransactionThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getAckThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue()));
cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getAdminBrokerThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue()));
}

public static RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
Expand Down Expand Up @@ -98,26 +115,9 @@ private void cleanExpiredRequest() {
}
}

cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

cleanExpiredRequestInQueue(this.brokerController.getLitePullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue());

cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());

cleanExpiredRequestInQueue(this.brokerController.getAckThreadPoolQueue(),
brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue());

cleanExpiredRequestInQueue(this.brokerController.getAdminBrokerThreadPoolQueue(),
brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue());
for (Pair<BlockingQueue<Runnable>, Supplier<Long>> pair : cleanExpiredRequestQueueList) {
cleanExpiredRequestInQueue(pair.getObject1(), pair.getObject2().get());
}
}

void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
Expand Down Expand Up @@ -154,6 +154,11 @@ void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, fin
}
}

public synchronized void addCleanExpiredRequestQueue(BlockingQueue<Runnable> cleanExpiredRequestQueue,
Supplier<Long> maxWaitTimeMillsInQueueSupplier) {
cleanExpiredRequestQueueList.add(new Pair<>(cleanExpiredRequestQueue, maxWaitTimeMillsInQueueSupplier));
}

public void shutdown() {
this.scheduledExecutorService.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,46 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.assertj.core.api.Assertions.assertThat;

public class BrokerFastFailureTest {

private BrokerController brokerController;

private final BrokerConfig brokerConfig = new BrokerConfig();

private MessageStore messageStore;

@Before
public void setUp() {
brokerController = Mockito.mock(BrokerController.class);
messageStore = Mockito.mock(DefaultMessageStore.class);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
Mockito.when(brokerController.getSendThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getPullThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getLitePullThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getHeartbeatThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getEndTransactionThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getAdminBrokerThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getAckThreadPoolQueue()).thenReturn(queue);
Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
Mockito.when(messageStore.isOSPageCacheBusy()).thenReturn(false);
Mockito.when(brokerController.getMessageStore()).thenReturn(messageStore);
}

@Test
public void testCleanExpiredRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(brokerController);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
Expand Down Expand Up @@ -63,4 +93,40 @@ public void run() {
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
}

@Test
public void testCleanExpiredCustomRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(brokerController);
brokerFastFailure.start();
brokerConfig.setWaitTimeMillsInAckQueue(10);
BlockingQueue<Runnable> customThreadPoolQueue = new LinkedBlockingQueue<>();
brokerFastFailure.addCleanExpiredRequestQueue(customThreadPoolQueue, () -> brokerConfig.getWaitTimeMillsInAckQueue());

Runnable runnable = new Runnable() {
@Override
public void run() {

}
};
RequestTask requestTask = new RequestTask(runnable, null, null);
customThreadPoolQueue.add(new FutureTaskExt<>(requestTask, null));

Thread.sleep(2000);

assertThat(customThreadPoolQueue.size()).isEqualTo(0);
assertThat(requestTask.isStopRun()).isEqualTo(true);

brokerConfig.setWaitTimeMillsInAckQueue(10000);

RequestTask requestTask2 = new RequestTask(runnable, null, null);
customThreadPoolQueue.add(new FutureTaskExt<>(requestTask2, null));

Thread.sleep(1000);

assertThat(customThreadPoolQueue.size()).isEqualTo(1);
assertThat(((FutureTaskExt) customThreadPoolQueue.peek()).getRunnable()).isEqualTo(requestTask2);

brokerFastFailure.shutdown();

}

}

0 comments on commit 9173def

Please sign in to comment.