Skip to content

Commit ad831d1

Browse files
heesung-snRobertIndie
authored andcommitted
[improve][broker] Fix ServiceUnitStateCompactionStrategy to cover fast-forward cursor behavior after compaction (#20110)
Master Issue: #16691 ### Motivation Raising a PR to implement: #16691 After the compaction, the cursor can fast-forward to the compacted horizon when a large number of messages are compacted before the next read. Hence, ServiceUnitStateCompactionStrategy also needs to cover this case. Currently, the existing and slow(their states are far behind) tableviews with ServiceUnitStateCompactionStrategy could not accept those compacted messages. In the load balance extension context, this means the ownership data could be inconsistent among brokers. ### Modifications This PR - fixes ServiceUnitStateCompactionStrategy to accept the state data if its version is bigger than the current version +1. - (minor fix) does not repeatedly update the replication_clusters in the policies when creating the system namespace. This update redundantly triggers ZK watchers when restarting brokers. - sets closeWithoutWaitingClientDisconnect=true, upon unload(following the same setting as the modular LM's) (cherry picked from commit 6cfa468)
1 parent a92defc commit ad831d1

File tree

8 files changed

+114
-9
lines changed

8 files changed

+114
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -385,10 +385,19 @@ static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName nam
385385
namespaceResources.createPolicies(namespaceName, policies);
386386
} else {
387387
log.info("Namespace {} already exists.", namespaceName);
388-
namespaceResources.setPolicies(namespaceName, policies -> {
389-
policies.replication_clusters.add(cluster);
390-
return policies;
391-
});
388+
var replicaClusterFound = false;
389+
var policiesOptional = namespaceResources.getPolicies(namespaceName);
390+
if (policiesOptional.isPresent() && policiesOptional.get().replication_clusters.contains(cluster)) {
391+
replicaClusterFound = true;
392+
}
393+
if (!replicaClusterFound) {
394+
namespaceResources.setPolicies(namespaceName, policies -> {
395+
policies.replication_clusters.add(cluster);
396+
return policies;
397+
});
398+
log.info("Updated namespace:{} policies. Added the replication cluster:{}",
399+
namespaceName, cluster);
400+
}
392401
}
393402
}
394403

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
819819
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
820820
return pulsar.getBrokerService().unloadServiceUnit(
821821
bundle,
822-
false,
822+
true,
823823
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
824824
TimeUnit.MILLISECONDS)
825825
.thenApply(numUnloadedTopics -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@ public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to
5252
return false;
5353
}
5454

55-
// Skip the compaction case where from = null and to.versionId > 1
56-
if (from != null && from.versionId() + 1 != to.versionId()) {
57-
return true;
55+
if (from != null) {
56+
if (from.versionId() == Long.MAX_VALUE && to.versionId() == Long.MIN_VALUE) { // overflow
57+
} else if (from.versionId() >= to.versionId()) {
58+
return true;
59+
} else if (from.versionId() < to.versionId() - 1) { // Compacted
60+
return false;
61+
} // else from.versionId() == to.versionId() - 1 // continue to check further
5862
}
5963

6064
if (to.force()) {

pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ private <T> CompletableFuture<Long> runPhaseTwo(
379379
});
380380
})
381381
.thenCompose(v -> {
382-
log.info("Acking ledger id {}", phaseOneResult.firstId);
382+
log.info("Acking ledger id {}", phaseOneResult.lastId);
383383
return ((CompactionReaderImpl<T>) reader)
384384
.acknowledgeCumulativeAsync(
385385
phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,8 @@ public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Except
592592
restartBroker();
593593
pulsar1 = pulsar;
594594
setPrimaryLoadManager();
595+
admin.namespaces().setNamespaceReplicationClusters("public/default",
596+
Sets.newHashSet(this.conf.getClusterName()));
595597

596598
var serviceUnitStateChannelPrimaryNew =
597599
(ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager,

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public void testVersionId(){
8585
new ServiceUnitStateData(Owned, dst, src, 10),
8686
new ServiceUnitStateData(Releasing, "broker2", dst, 5)));
8787

88+
assertFalse(strategy.shouldKeepLeft(
89+
new ServiceUnitStateData(Owned, dst, src, 10),
90+
new ServiceUnitStateData(Owned, "broker2", dst, 12)));
91+
8892
}
8993

9094
@Test

pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java

+79
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
2828
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
2929
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
30+
import static org.mockito.Mockito.doAnswer;
31+
import static org.mockito.Mockito.reset;
32+
import static org.mockito.Mockito.spy;
3033
import static org.testng.Assert.assertEquals;
3134
import static org.testng.Assert.assertNotNull;
3235
import static org.testng.Assert.assertNull;
@@ -49,6 +52,7 @@
4952
import java.util.concurrent.Semaphore;
5053
import java.util.concurrent.TimeUnit;
5154
import java.util.concurrent.atomic.AtomicBoolean;
55+
import java.util.concurrent.atomic.AtomicInteger;
5256
import java.util.stream.Collectors;
5357
import org.apache.bookkeeper.client.BookKeeper;
5458
import org.apache.commons.lang.reflect.FieldUtils;
@@ -69,6 +73,7 @@
6973
import org.apache.pulsar.client.api.Reader;
7074
import org.apache.pulsar.client.api.Schema;
7175
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
76+
import org.apache.pulsar.client.impl.ReaderImpl;
7277
import org.apache.pulsar.common.policies.data.ClusterData;
7378
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
7479
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -628,6 +633,80 @@ public void testSlowTableviewAfterCompaction() throws Exception {
628633

629634
}
630635

636+
@Test
637+
public void testSlowReceiveTableviewAfterCompaction() throws Exception {
638+
String topic = "persistent://my-property/use/my-ns/my-topic1";
639+
String strategyClassName = "topicCompactionStrategyClassName";
640+
641+
pulsarClient.newConsumer(schema)
642+
.topic(topic)
643+
.subscriptionName("sub1")
644+
.readCompacted(true)
645+
.subscribe().close();
646+
647+
var tv = pulsar.getClient().newTableViewBuilder(schema)
648+
.topic(topic)
649+
.subscriptionName("slowTV")
650+
.loadConf(Map.of(
651+
strategyClassName,
652+
ServiceUnitStateCompactionStrategy.class.getName()))
653+
.create();
654+
655+
// Configure retention to ensue data is retained for reader
656+
admin.namespaces().setRetention("my-property/use/my-ns",
657+
new RetentionPolicies(-1, -1));
658+
659+
Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
660+
.topic(topic)
661+
.compressionType(MSG_COMPRESSION_TYPE)
662+
.enableBatching(true)
663+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
664+
.create();
665+
666+
StrategicTwoPhaseCompactor compactor
667+
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
668+
669+
var reader = ((CompletableFuture<ReaderImpl<ServiceUnitStateData>>) FieldUtils
670+
.readDeclaredField(tv, "reader", true)).get();
671+
var consumer = spy(reader.getConsumer());
672+
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
673+
String bundle = "bundle1";
674+
final AtomicInteger versionId = new AtomicInteger(0);
675+
final AtomicInteger cnt = new AtomicInteger(1);
676+
int msgAddCount = 1000; // has to be big enough to cover compacted cursor fast-forward.
677+
doAnswer(invocationOnMock -> {
678+
if (cnt.decrementAndGet() == 0) {
679+
var msg = consumer.receiveAsync();
680+
for (int i = 0; i < msgAddCount; i++) {
681+
producer.newMessage().key(bundle).value(
682+
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
683+
versionId.get())).send();
684+
}
685+
compactor.compact(topic, strategy).join();
686+
return msg;
687+
}
688+
// Call the real method
689+
reset(consumer);
690+
return consumer.receiveAsync();
691+
}).when(consumer).receiveAsync();
692+
producer.newMessage().key(bundle).value(
693+
new ServiceUnitStateData(Owned, "broker", true,
694+
versionId.incrementAndGet())).send();
695+
producer.newMessage().key(bundle).value(
696+
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
697+
versionId.get())).send();
698+
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
699+
() -> {
700+
var val = tv.get(bundle);
701+
assertNotNull(val);
702+
assertEquals(val.dstBroker(), "broker" + versionId.get());
703+
}
704+
);
705+
706+
producer.close();
707+
tv.close();
708+
}
709+
631710
@Test
632711
public void testBrokerRestartAfterCompaction() throws Exception {
633712
String topic = "persistent://my-property/use/my-ns/my-topic1";

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java

+7
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,13 @@ private void handleMessage(Message<T> msg) {
192192
if (compactionStrategy != null) {
193193
T prev = data.get(key);
194194
update = !compactionStrategy.shouldKeepLeft(prev, cur);
195+
if (!update) {
196+
log.info("Skipped the message from topic {}. key={} value={} prev={}",
197+
conf.getTopicName(),
198+
key,
199+
cur,
200+
prev);
201+
}
195202
}
196203

197204
if (update) {

0 commit comments

Comments
 (0)