diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e899248136592..ab27e5ee983d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3448,10 +3448,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { public synchronized void triggerCompaction() throws PulsarServerException, AlreadyRunningException { if (currentCompaction.isDone()) { + if (!lock.readLock().tryLock()) { + log.info("[{}] Topic is closing or deleting, skip triggering compaction -lock", topic); + return; + } try { - lock.writeLock().lock(); if (isClosingOrDeleting) { - log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic); + log.info("[{}] Topic is closing or deleting, skip triggering compaction -flag", topic); return; } @@ -3461,14 +3464,14 @@ public synchronized void triggerCompaction() } else { currentCompaction = topicCompactionService.compact().thenApply(x -> null); } - currentCompaction.whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("[{}] Compaction failure.", topic, ex); - } - }); } finally { - lock.writeLock().unlock(); + lock.readLock().unlock(); } + currentCompaction.whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("[{}] Compaction failure.", topic, ex); + } + }); } else { throw new AlreadyRunningException("Compaction already in progress"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 77eed2fbd24cc..a1dfbe6055876 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -2077,7 +2077,7 @@ public void testDeleteCompactedLedger() throws Exception { @Test public void testDeleteCompactedLedgerWithSlowAck() throws Exception { - // Disable topic level policies, since block ack thread will block topic level policies deleted + // Disable topic level policies, since block ack thread may also block thread of delete topic policies. conf.setTopicLevelPoliciesEnabled(false); restartBroker();