Skip to content

Commit 6f9c933

Browse files
Vineethvineeth1995
Vineeth
authored andcommitted
[feat] [broker] PIP-188 support blue-green cluster migration [part-2]
1 parent 8c50a6c commit 6f9c933

File tree

9 files changed

+253
-44
lines changed

9 files changed

+253
-44
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java

+2
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
4949
}
5050

5151
boolean isConnected();
52+
53+
long getNumberOfEntriesInBacklog();
5254
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -1578,13 +1578,23 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
15781578
if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) {
15791579
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar());
15801580
if (clusterURL.isPresent()) {
1581-
log.info("[{}] redirect migrated producer to topic {}: producerId={}, {}", remoteAddress, topicName,
1582-
producerId, ex.getCause().getMessage());
1583-
commandSender.sendTopicMigrated(ResourceType.Producer, producerId,
1584-
clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls());
1585-
closeProducer(producer);
1586-
return null;
1587-
1581+
if (topic.isReplicationBacklogExist()) {
1582+
log.info("Topic {} is migrated but replication backlog exist: "
1583+
+ "producerId = {}, producerName = {}, {}", topicName,
1584+
producerId, producerName, ex.getCause().getMessage());
1585+
} else {
1586+
log.info("[{}] redirect migrated producer to topic {}: "
1587+
+ "producerId={}, producerName = {}, {}", remoteAddress,
1588+
topicName, producerId, producerName, ex.getCause().getMessage());
1589+
boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Producer, producerId,
1590+
clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls());
1591+
if (!msgSent) {
1592+
log.info("client doesn't support topic migration handling {}-{}-{}", topic,
1593+
remoteAddress, producerId);
1594+
}
1595+
closeProducer(producer);
1596+
return null;
1597+
}
15881598
} else {
15891599
log.warn("[{}] failed producer because migration url not configured topic {}: producerId={}, {}",
15901600
remoteAddress, topicName, producerId, ex.getCause().getMessage());

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

+2
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init
234234

235235
boolean isBrokerPublishRateExceeded();
236236

237+
boolean isReplicationBacklogExist();
238+
237239
void disableCnxAutoRead();
238240

239241
void enableCnxAutoRead();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ protected Position getReplicatorReadPosition() {
248248
}
249249

250250
@Override
251-
protected long getNumberOfEntriesInBacklog() {
251+
public long getNumberOfEntriesInBacklog() {
252252
// No-op
253253
return 0;
254254
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

+5
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ public void checkMessageDeduplicationInfo() {
236236
// No-op
237237
}
238238

239+
@Override
240+
public boolean isReplicationBacklogExist() {
241+
return false;
242+
}
243+
239244
@Override
240245
public void removeProducer(Producer producer) {
241246
checkArgument(producer.getTopic() == this);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ protected Position getReplicatorReadPosition() {
165165
}
166166

167167
@Override
168-
protected long getNumberOfEntriesInBacklog() {
169-
return cursor.getNumberOfEntriesInBacklog(false);
168+
public long getNumberOfEntriesInBacklog() {
169+
return cursor.getNumberOfEntriesInBacklog(true);
170170
}
171171

172172
@Override

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
575575

576576
@Override
577577
public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
578+
PublishContext callback = (PublishContext) ctx;
578579
if (exception instanceof ManagedLedgerFencedException) {
579580
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
580581
close();
@@ -587,7 +588,11 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
587588
List<CompletableFuture<Void>> futures = new ArrayList<>();
588589
// send migration url metadata to producers before disconnecting them
589590
if (isMigrated()) {
590-
producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl()));
591+
if (isReplicationBacklogExist()) {
592+
log.info("Topic {} is migrated but replication backlog exists. Closing producers.", topic);
593+
} else {
594+
producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl()));
595+
}
591596
}
592597
producers.forEach((__, producer) -> futures.add(producer.disconnect()));
593598
disconnectProducersFuture = FutureUtil.waitForAll(futures);
@@ -599,8 +604,6 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
599604
return null;
600605
});
601606

602-
PublishContext callback = (PublishContext) ctx;
603-
604607
if (exception instanceof ManagedLedgerAlreadyClosedException) {
605608
if (log.isDebugEnabled()) {
606609
log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
@@ -2510,6 +2513,18 @@ public CompletableFuture<Void> checkClusterMigration() {
25102513
}
25112514
}
25122515

2516+
public boolean isReplicationBacklogExist() {
2517+
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
2518+
if (replicators != null) {
2519+
for (Replicator replicator : replicators.values()) {
2520+
if (replicator.getNumberOfEntriesInBacklog() != 0) {
2521+
return true;
2522+
}
2523+
}
2524+
}
2525+
return false;
2526+
}
2527+
25132528
@Override
25142529
public void checkGC() {
25152530
if (!isDeleteWhileInactive()) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,20 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
112112
protected URI lookupUrl;
113113

114114
protected boolean isTcpLookup = false;
115-
protected static final String configClusterName = "test";
115+
protected String configClusterName = "test";
116116

117117
protected boolean enableBrokerInterceptor = false;
118118

119119
public MockedPulsarServiceBaseTest() {
120120
resetConfig();
121121
}
122122

123+
protected void setupWithClusterName(String clusterName) throws Exception {
124+
this.conf.setClusterName(clusterName);
125+
this.configClusterName = clusterName;
126+
this.internalSetup();
127+
}
128+
123129
protected PulsarService getPulsar() {
124130
return pulsar;
125131
}

0 commit comments

Comments
 (0)