From 497530f779a97c5ce622c0c2ed2d46c2824220c7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 6 May 2024 21:23:24 +0300 Subject: [PATCH] [fix][broker] Fix thread safety of loadSheddingTask and loadResourceQuotaTask fields --- .../apache/pulsar/broker/PulsarService.java | 52 ++++++++++--------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 559ca1e9e690b..58d7e71b65d84 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -517,9 +517,7 @@ public CompletableFuture closeAsync() { } // cancel loadShedding task and shutdown the loadManager executor before shutting down the broker - if (this.loadSheddingTask != null) { - this.loadSheddingTask.cancel(); - } + cancelLoadBalancerTasks(); executorServicesShutdown.shutdown(loadManagerExecutor); List> asyncCloseFutures = new ArrayList<>(); @@ -1183,20 +1181,7 @@ protected void startLeaderElectionService() { if (state == LeaderElectionState.Leading) { LOG.info("This broker {} was elected leader", getBrokerId()); if (getConfiguration().isLoadBalancerEnabled()) { - long resourceQuotaUpdateInterval = TimeUnit.MINUTES - .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes()); - - if (loadSheddingTask != null) { - loadSheddingTask.cancel(); - } - if (loadResourceQuotaTask != null) { - loadResourceQuotaTask.cancel(false); - } - loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config); - loadSheddingTask.start(); - loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate( - new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval, - resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS); + startLoadBalancerTasks(); } } else { if (leaderElectionService != null) { @@ -1209,20 +1194,37 @@ protected void startLeaderElectionService() { } } - if (loadSheddingTask != null) { - loadSheddingTask.cancel(); - loadSheddingTask = null; - } - if (loadResourceQuotaTask != null) { - loadResourceQuotaTask.cancel(false); - loadResourceQuotaTask = null; - } + cancelLoadBalancerTasks(); } }); leaderElectionService.start(); } + private synchronized void cancelLoadBalancerTasks() { + if (loadSheddingTask != null) { + loadSheddingTask.cancel(); + loadSheddingTask = null; + } + if (loadResourceQuotaTask != null) { + loadResourceQuotaTask.cancel(false); + loadResourceQuotaTask = null; + } + } + + private synchronized void startLoadBalancerTasks() { + cancelLoadBalancerTasks(); + if (isRunning()) { + long resourceQuotaUpdateInterval = TimeUnit.MINUTES + .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes()); + loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config); + loadSheddingTask.start(); + loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate( + new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval, + resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS); + } + } + protected void acquireSLANamespace() { try { // Namespace not created hence no need to unload it