diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java index f7279968c51bb..f6f8d0b9e0f80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -28,6 +29,8 @@ import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.compaction.CompactedTopic; +import org.apache.pulsar.compaction.CompactedTopicContext; +import org.apache.pulsar.compaction.CompactedTopicImpl; import org.apache.pulsar.compaction.Compactor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,5 +109,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { } } + CompletableFuture cleanCompactedLedger() { + final CompletableFuture compactedTopicContextFuture = + ((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture(); + if (compactedTopicContextFuture != null) { + return compactedTopicContextFuture.thenCompose(context -> { + long compactedLedgerId = context.getLedger().getId(); + ((CompactedTopicImpl) compactedTopic).reset(); + return compactedTopic.deleteCompactedLedger(compactedLedgerId); + }); + } else { + return CompletableFuture.completedFuture(null); + } + } + private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index db27057029a76..6b9cb044428df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -201,7 +201,8 @@ public static boolean isDedupCursorName(String name) { protected final MessageDeduplication messageDeduplication; private static final long COMPACTION_NEVER_RUN = -0xfebecffeL; - private CompletableFuture currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); + private volatile CompletableFuture currentCompaction = CompletableFuture.completedFuture( + COMPACTION_NEVER_RUN); private final CompactedTopic compactedTopic; private CompletableFuture currentOffload = CompletableFuture.completedFuture( @@ -1031,13 +1032,13 @@ public CompletableFuture unsubscribe(String subscriptionName) { new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - asyncDeleteCursor(subscriptionName, unsubscribeFuture); + asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture); } @Override public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { if (exception instanceof MetadataNotFoundException) { - asyncDeleteCursor(subscriptionName, unsubscribeFuture); + asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture); return; } @@ -1047,12 +1048,41 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { } }, null); } else { - asyncDeleteCursor(subscriptionName, unsubscribeFuture); + asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture); } return unsubscribeFuture; } + private void asyncDeleteCursorWithCleanCompactionLedger(String subscriptionName, + CompletableFuture unsubscribeFuture) { + PersistentSubscription subscription = subscriptions.get(subscriptionName); + if (subscription == null) { + log.warn("[{}][{}] Can't find subscription, skip delete cursor", topic, subscriptionName); + unsubscribeFuture.complete(null); + return; + } + + if ((!isCompactionSubscription(subscriptionName)) || !(subscription instanceof CompactorSubscription)) { + asyncDeleteCursor(subscriptionName, unsubscribeFuture); + return; + } + + currentCompaction.handle((__, e) -> { + if (e != null) { + log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName); + } + return ((CompactorSubscription) subscription).cleanCompactedLedger(); + }).whenComplete((__, ex) -> { + if (ex != null) { + log.error("[{}][{}] Error cleaning compacted ledger", topic, subscriptionName, ex); + unsubscribeFuture.completeExceptionally(ex); + } else { + asyncDeleteCursor(subscriptionName, unsubscribeFuture); + } + }); + } + private void asyncDeleteCursor(String subscriptionName, CompletableFuture unsubscribeFuture) { ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new DeleteCursorCallback() { @Override @@ -2824,11 +2854,24 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { public synchronized void triggerCompaction() throws PulsarServerException, AlreadyRunningException { if (currentCompaction.isDone()) { - currentCompaction = brokerService.pulsar().getCompactor().compact(topic); + if (!lock.readLock().tryLock()) { + log.info("[{}] Conflict topic-close, topic-delete, skip triggering compaction", topic); + return; + } + try { + if (isClosingOrDeleting) { + log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic); + return; + } + + currentCompaction = brokerService.pulsar().getCompactor().compact(topic); + } finally { + lock.readLock().unlock(); + } currentCompaction.whenComplete((ignore, ex) -> { - if (ex != null){ - log.warn("[{}] Compaction failure.", topic, ex); - } + if (ex != null) { + log.warn("[{}] Compaction failure.", topic, ex); + } }); } else { throw new AlreadyRunningException("Compaction already in progress"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 8faf02c81b3c6..8cc771959f10d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -59,6 +59,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE); consumerConfiguration.setReadCompacted(true); consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + consumerConfiguration.setAckReceiptEnabled(true); consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture); @@ -122,7 +123,7 @@ static class RawConsumerImpl extends ConsumerImpl { MessageId.earliest, 0 /* startMessageRollbackDurationInSec */, Schema.BYTES, null, - true + false ); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 9b25b895ba8d3..f4767fc65333c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -31,6 +31,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -330,6 +331,16 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) public synchronized Optional getCompactionHorizon() { return Optional.ofNullable(this.compactionHorizon); } + + public void reset() { + this.compactionHorizon = null; + this.compactedTopicContext = null; + } + + @Nullable + public CompletableFuture getCompactedTopicContextFuture() { + return compactedTopicContext; + } private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 6792d4a4e2c23..c1fb370ae6fe9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -26,6 +26,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -47,15 +48,19 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.OpenBuilder; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.BrokerTestUtil; @@ -64,6 +69,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -84,6 +90,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -1969,4 +1976,128 @@ public void testCompactionDuplicate() throws Exception { } } } + + @Test + public void testDeleteCompactedLedger() throws Exception { + String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedger"; + + final String subName = "my-sub"; + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topicName).create(); + + pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close(); + + for (int i = 0; i < 10; i++) { + producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync(); + } + producer.flush(); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topicName).get(); + + MutableLong compactedLedgerId = new MutableLong(-1); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L); + compactedLedgerId.setValue(stats.compactedLedger.ledgerId); + Assert.assertEquals(stats.compactedLedger.entries, 2L); + }); + + // delete compacted ledger + admin.topics().deleteSubscription(topicName, "__compaction"); + + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName); + Assert.assertEquals(stats.compactedLedger.ledgerId, -1L); + Assert.assertEquals(stats.compactedLedger.entries, -1L); + assertThrows(BKException.BKNoSuchLedgerExistsException.class, () -> pulsar.getBookKeeperClient() + .openLedger(compactedLedgerId.getValue(), BookKeeper.DigestType.CRC32C, new byte[]{})); + }); + + compactor.compact(topicName).get(); + + MutableLong compactedLedgerId2 = new MutableLong(-1); + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName); + Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L); + compactedLedgerId2.setValue(stats.compactedLedger.ledgerId); + Assert.assertEquals(stats.compactedLedger.entries, 2L); + }); + + producer.close(); + admin.topics().delete(topicName); + + Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class, + () -> pulsar.getBookKeeperClient().openLedger( + compactedLedgerId2.getValue(), BookKeeper.DigestType.CRC32, new byte[]{}))); + } + + @Test + public void testDeleteCompactedLedgerWithSlowAck() throws Exception { + // Disable topic level policies, since block ack thread may also block thread of delete topic policies. + conf.setTopicLevelPoliciesEnabled(false); + restartBroker(); + + String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck"; + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topicName).create(); + + pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(Compactor.COMPACTION_SUBSCRIPTION) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe() + .close(); + + for (int i = 0; i < 10; i++) { + producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync(); + } + producer.flush(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentSubscription subscription = spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION)); + topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, subscription); + + AtomicLong compactedLedgerId = new AtomicLong(-1); + AtomicBoolean pauseAck = new AtomicBoolean(); + Mockito.doAnswer(invocationOnMock -> { + Map properties = (Map) invocationOnMock.getArguments()[2]; + log.info("acknowledgeMessage properties: {}", properties); + compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)); + pauseAck.set(true); + while (pauseAck.get()) { + Thread.sleep(200); + } + return invocationOnMock.callRealMethod(); + }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq( + CommandAck.AckType.Cumulative), Mockito.any()); + + admin.topics().triggerCompaction(topicName); + + while (!pauseAck.get()) { + Thread.sleep(100); + } + + CompletableFuture currentCompaction = + (CompletableFuture) FieldUtils.readDeclaredField(topic, "currentCompaction", true); + CompletableFuture spyCurrentCompaction = spy(currentCompaction); + FieldUtils.writeDeclaredField(topic, "currentCompaction", spyCurrentCompaction, true); + currentCompaction.whenComplete((obj, throwable) -> { + if (throwable != null) { + spyCurrentCompaction.completeExceptionally(throwable); + } else { + spyCurrentCompaction.complete(obj); + } + }); + Mockito.doAnswer(invocationOnMock -> { + pauseAck.set(false); + return invocationOnMock.callRealMethod(); + }).when(spyCurrentCompaction).handle(Mockito.any()); + + admin.topics().delete(topicName, true); + + Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class, + () -> pulsar.getBookKeeperClient().openLedger( + compactedLedgerId.get(), BookKeeper.DigestType.CRC32, new byte[]{}))); + } }