Skip to content

Commit 41ef3f6

Browse files
TakaHiR07fanjianye
and
fanjianye
authored
[fix][broker] Fix MessageDeduplication replay timeout cause topic loading stuck (apache#23004)
Co-authored-by: fanjianye <fanjianye@bigo.sg>
1 parent 8351c07 commit 41ef3f6

File tree

2 files changed

+114
-4
lines changed

2 files changed

+114
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,12 @@ private CompletableFuture<Void> recoverSequenceIdsMap() {
159159
log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
160160
CompletableFuture<Position> future = new CompletableFuture<>();
161161
replayCursor(future);
162-
return future.thenAccept(lastPosition -> {
162+
return future.thenCompose(lastPosition -> {
163163
if (lastPosition != null && snapshotCounter >= snapshotInterval) {
164164
snapshotCounter = 0;
165-
takeSnapshot(lastPosition);
165+
return takeSnapshot(lastPosition);
166166
}
167+
return CompletableFuture.completedFuture(null);
167168
});
168169
}
169170

@@ -438,13 +439,15 @@ public void resetHighestSequenceIdPushed() {
438439
}
439440
}
440441

441-
private void takeSnapshot(Position position) {
442+
private CompletableFuture<Void> takeSnapshot(Position position) {
443+
CompletableFuture<Void> future = new CompletableFuture<>();
442444
if (log.isDebugEnabled()) {
443445
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
444446
}
445447

446448
if (!snapshotTaking.compareAndSet(false, true)) {
447-
return;
449+
future.complete(null);
450+
return future;
448451
}
449452

450453
Map<String, Long> snapshot = new TreeMap<>();
@@ -462,14 +465,17 @@ public void markDeleteComplete(Object ctx) {
462465
}
463466
lastSnapshotTimestamp = System.currentTimeMillis();
464467
snapshotTaking.set(false);
468+
future.complete(null);
465469
}
466470

467471
@Override
468472
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
469473
log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
470474
snapshotTaking.set(false);
475+
future.completeExceptionally(exception);
471476
}
472477
}, null);
478+
return future;
473479
}
474480

475481
/**

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java

+104
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
2122
import static org.testng.Assert.assertEquals;
2223
import static org.testng.Assert.assertFalse;
2324
import static org.testng.Assert.assertNotEquals;
2425
import static org.testng.Assert.assertNotNull;
2526
import static org.testng.Assert.assertNull;
2627
import static org.testng.Assert.assertTrue;
2728
import static org.testng.Assert.fail;
29+
30+
import java.lang.reflect.Field;
2831
import java.util.Optional;
2932
import java.util.UUID;
3033
import java.util.concurrent.CompletableFuture;
@@ -33,12 +36,18 @@
3336
import lombok.Cleanup;
3437
import org.apache.bookkeeper.mledger.ManagedCursor;
3538
import org.apache.bookkeeper.mledger.Position;
39+
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
40+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
41+
import org.apache.pulsar.broker.BrokerTestUtil;
42+
import org.apache.pulsar.broker.service.BrokerService;
3643
import org.apache.pulsar.broker.service.Topic;
3744
import org.apache.pulsar.client.api.Producer;
3845
import org.apache.pulsar.client.api.ProducerConsumerBase;
3946
import org.apache.pulsar.client.api.Schema;
4047
import org.apache.pulsar.common.naming.TopicName;
48+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
4149
import org.awaitility.Awaitility;
50+
import org.awaitility.reflect.WhiteboxImpl;
4251
import org.testng.Assert;
4352
import org.testng.annotations.AfterMethod;
4453
import org.testng.annotations.BeforeMethod;
@@ -529,6 +538,101 @@ public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() thro
529538
persistentTopic.checkDeduplicationSnapshot();
530539
}
531540

541+
@Test
542+
public void testFinishTakeSnapshotWhenTopicLoading() throws Exception {
543+
cleanup();
544+
setup();
545+
546+
// Create a topic and wait deduplication is started.
547+
int brokerDeduplicationEntriesInterval = 1000;
548+
pulsar.getConfiguration().setBrokerDeduplicationEnabled(true);
549+
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
550+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
551+
admin.topics().createNonPartitionedTopic(topic);
552+
final PersistentTopic persistentTopic1 =
553+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
554+
final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
555+
Awaitility.await().untilAsserted(() -> {
556+
ManagedCursorImpl cursor1 =
557+
(ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
558+
assertNotNull(cursor1);
559+
});
560+
final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication();
561+
562+
563+
// Send 999 messages, it is less than "brokerDeduplicationEntriesInterval".
564+
// So it would not trigger takeSnapshot
565+
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
566+
.topic(topic).enableBatching(false).create();
567+
for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
568+
producer.send(i + "");
569+
}
570+
producer.close();
571+
int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter");
572+
assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
573+
574+
575+
// Unload and load topic, simulate topic load is timeout.
576+
// SetBrokerDeduplicationEntriesInterval to 10, therefore recoverSequenceIdsMap#takeSnapshot
577+
// would trigger and should update the snapshot position.
578+
// However, if topic close and takeSnapshot are concurrent,
579+
// it would result in takeSnapshot throw exception
580+
admin.topics().unload(topic);
581+
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10);
582+
583+
// Mock message deduplication recovery speed topicLoadTimeoutSeconds
584+
pulsar.getConfiguration().setTopicLoadTimeoutSeconds(1);
585+
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
586+
TopicName.get(topic).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
587+
mockZooKeeper.delay(2 * 1000, (op, path) -> {
588+
if (mlPath.equals(path)) {
589+
return true;
590+
}
591+
return false;
592+
});
593+
594+
Field field2 = BrokerService.class.getDeclaredField("topics");
595+
field2.setAccessible(true);
596+
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
597+
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>)
598+
field2.get(pulsar.getBrokerService());
599+
600+
try {
601+
pulsar.getBrokerService().getTopic(topic, false).join().get();
602+
Assert.fail();
603+
} catch (Exception e) {
604+
// topic loading should timeout.
605+
}
606+
Awaitility.await().untilAsserted(() -> {
607+
// topic loading timeout then close topic and remove from topicsMap
608+
Assert.assertFalse(topics.containsKey(topic));
609+
});
610+
611+
612+
// Load topic again, setBrokerDeduplicationEntriesInterval to 10000,
613+
// make recoverSequenceIdsMap#takeSnapshot not trigger takeSnapshot.
614+
// But actually it should not replay again in recoverSequenceIdsMap,
615+
// since previous topic loading should finish the replay process.
616+
pulsar.getConfiguration().setBrokerDeduplicationEntriesInterval(10000);
617+
pulsar.getConfiguration().setTopicLoadTimeoutSeconds(60);
618+
PersistentTopic persistentTopic2 =
619+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
620+
ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger();
621+
MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication();
622+
623+
Awaitility.await().untilAsserted(() -> {
624+
int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
625+
Assert.assertEquals(snapshotCounter3, 0);
626+
Assert.assertEquals(ml2.getLedgersInfo().size(), 1);
627+
});
628+
629+
630+
// cleanup.
631+
admin.topics().delete(topic);
632+
cleanup();
633+
setup();
634+
}
635+
532636
private void waitCacheInit(String topicName) throws Exception {
533637
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
534638
TopicName topic = TopicName.get(topicName);

0 commit comments

Comments
 (0)