From 9173def4040f00ea5a4f1e913382b8ccd2b08d17 Mon Sep 17 00:00:00 2001 From: rongtong Date: Thu, 4 Jul 2024 16:39:53 +0800 Subject: [PATCH] [ISSUE #8348] Allow custom fast-failure queues to be added in BrokerFastFailure (#8347) --- .../rocketmq/broker/BrokerController.java | 2 + .../broker/latency/BrokerFastFailure.java | 45 ++++++------ .../broker/latency/BrokerFastFailureTest.java | 68 ++++++++++++++++++- 3 files changed, 94 insertions(+), 21 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 76224db5cb5..145a9522306 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -2519,4 +2519,6 @@ public ColdDataCgCtrService getColdDataCgCtrService() { public void setColdDataCgCtrService(ColdDataCgCtrService coldDataCgCtrService) { this.coldDataCgCtrService = coldDataCgCtrService; } + + } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 0135ac929a7..ce8fdd88579 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -16,11 +16,15 @@ */ package org.apache.rocketmq.broker.latency; +import java.util.List; +import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.AbstractBrokerRunnable; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -42,13 +46,26 @@ public class BrokerFastFailure { private volatile long jstackTime = System.currentTimeMillis(); + private final List, Supplier>> cleanExpiredRequestQueueList = new ArrayList<>(); + public BrokerFastFailure(final BrokerController brokerController) { this.brokerController = brokerController; + initCleanExpiredRequestQueueList(); this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true, brokerController == null ? null : brokerController.getBrokerConfig())); } + private void initCleanExpiredRequestQueueList() { + cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getSendThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue())); + cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getPullThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue())); + cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getLitePullThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue())); + cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getHeartbeatThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue())); + cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getEndTransactionThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue())); + cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getAckThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue())); + cleanExpiredRequestQueueList.add(new Pair<>(this.brokerController.getAdminBrokerThreadPoolQueue(), () -> this.brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue())); + } + public static RequestTask castRunnable(final Runnable runnable) { try { if (runnable instanceof FutureTaskExt) { @@ -98,26 +115,9 @@ private void cleanExpiredRequest() { } } - cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(), - this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()); - - cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(), - this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()); - - cleanExpiredRequestInQueue(this.brokerController.getLitePullThreadPoolQueue(), - this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue()); - - cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(), - this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); - - cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this - .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()); - - cleanExpiredRequestInQueue(this.brokerController.getAckThreadPoolQueue(), - brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue()); - - cleanExpiredRequestInQueue(this.brokerController.getAdminBrokerThreadPoolQueue(), - brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue()); + for (Pair, Supplier> pair : cleanExpiredRequestQueueList) { + cleanExpiredRequestInQueue(pair.getObject1(), pair.getObject2().get()); + } } void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) { @@ -154,6 +154,11 @@ void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, fin } } + public synchronized void addCleanExpiredRequestQueue(BlockingQueue cleanExpiredRequestQueue, + Supplier maxWaitTimeMillsInQueueSupplier) { + cleanExpiredRequestQueueList.add(new Pair<>(cleanExpiredRequestQueue, maxWaitTimeMillsInQueueSupplier)); + } + public void shutdown() { this.scheduledExecutorService.shutdown(); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java index 31b547cf1be..2216a1d50c1 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java @@ -19,16 +19,46 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.future.FutureTaskExt; import org.apache.rocketmq.remoting.netty.RequestTask; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageStore; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import static org.assertj.core.api.Assertions.assertThat; public class BrokerFastFailureTest { + + private BrokerController brokerController; + + private final BrokerConfig brokerConfig = new BrokerConfig(); + + private MessageStore messageStore; + + @Before + public void setUp() { + brokerController = Mockito.mock(BrokerController.class); + messageStore = Mockito.mock(DefaultMessageStore.class); + BlockingQueue queue = new LinkedBlockingQueue<>(); + Mockito.when(brokerController.getSendThreadPoolQueue()).thenReturn(queue); + Mockito.when(brokerController.getPullThreadPoolQueue()).thenReturn(queue); + Mockito.when(brokerController.getLitePullThreadPoolQueue()).thenReturn(queue); + Mockito.when(brokerController.getHeartbeatThreadPoolQueue()).thenReturn(queue); + Mockito.when(brokerController.getEndTransactionThreadPoolQueue()).thenReturn(queue); + Mockito.when(brokerController.getAdminBrokerThreadPoolQueue()).thenReturn(queue); + Mockito.when(brokerController.getAckThreadPoolQueue()).thenReturn(queue); + Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + Mockito.when(messageStore.isOSPageCacheBusy()).thenReturn(false); + Mockito.when(brokerController.getMessageStore()).thenReturn(messageStore); + } + @Test public void testCleanExpiredRequestInQueue() throws Exception { - BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null); + BrokerFastFailure brokerFastFailure = new BrokerFastFailure(brokerController); BlockingQueue queue = new LinkedBlockingQueue<>(); brokerFastFailure.cleanExpiredRequestInQueue(queue, 1); @@ -63,4 +93,40 @@ public void run() { assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask); } + @Test + public void testCleanExpiredCustomRequestInQueue() throws Exception { + BrokerFastFailure brokerFastFailure = new BrokerFastFailure(brokerController); + brokerFastFailure.start(); + brokerConfig.setWaitTimeMillsInAckQueue(10); + BlockingQueue customThreadPoolQueue = new LinkedBlockingQueue<>(); + brokerFastFailure.addCleanExpiredRequestQueue(customThreadPoolQueue, () -> brokerConfig.getWaitTimeMillsInAckQueue()); + + Runnable runnable = new Runnable() { + @Override + public void run() { + + } + }; + RequestTask requestTask = new RequestTask(runnable, null, null); + customThreadPoolQueue.add(new FutureTaskExt<>(requestTask, null)); + + Thread.sleep(2000); + + assertThat(customThreadPoolQueue.size()).isEqualTo(0); + assertThat(requestTask.isStopRun()).isEqualTo(true); + + brokerConfig.setWaitTimeMillsInAckQueue(10000); + + RequestTask requestTask2 = new RequestTask(runnable, null, null); + customThreadPoolQueue.add(new FutureTaskExt<>(requestTask2, null)); + + Thread.sleep(1000); + + assertThat(customThreadPoolQueue.size()).isEqualTo(1); + assertThat(((FutureTaskExt) customThreadPoolQueue.peek()).getRunnable()).isEqualTo(requestTask2); + + brokerFastFailure.shutdown(); + + } + } \ No newline at end of file