Skip to content

Commit c19ebf3

Browse files
rdhabaliavisxu
authored andcommitted
[fix][broker] Fix Broker migration NPE while broker tls url not configured (apache#23534)
1 parent 2d3c3c6 commit c19ebf3

File tree

4 files changed

+18
-11
lines changed

4 files changed

+18
-11
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1363,8 +1363,8 @@ public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync
13631363
.getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
13641364
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
13651365
((clusterData, isNamespaceMigrationEnabled) -> {
1366-
Optional<ClusterUrl> url = ((clusterData.isPresent() && clusterData.get().isMigrated())
1367-
|| isNamespaceMigrationEnabled)
1366+
Optional<ClusterUrl> url = (clusterData.isPresent() && (clusterData.get().isMigrated()
1367+
|| isNamespaceMigrationEnabled))
13681368
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
13691369
: Optional.empty();
13701370
return url;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ public void testClusterMigration() throws Exception {
297297
assertFalse(topic2.getProducers().isEmpty());
298298

299299
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
300-
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
300+
pulsar2.getBrokerServiceUrl(), null);
301301
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
302302
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl);
303303

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -710,9 +710,12 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
710710
@Override
711711
protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) {
712712
final long resourceId = commandTopicMigrated.getResourceId();
713-
final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl();
714-
final String serviceUrlTls = commandTopicMigrated.getBrokerServiceUrlTls();
715-
713+
final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl()
714+
? commandTopicMigrated.getBrokerServiceUrl()
715+
: null;
716+
final String serviceUrlTls = commandTopicMigrated.hasBrokerServiceUrlTls()
717+
? commandTopicMigrated.getBrokerServiceUrlTls()
718+
: null;
716719
HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer
717720
? producers.get(resourceId)
718721
: consumers.get(resourceId);

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
8989
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
9090
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
91+
import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
9192
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
9293
import org.apache.pulsar.common.api.proto.FeatureFlags;
9394
import org.apache.pulsar.common.api.proto.IntRange;
@@ -768,11 +769,14 @@ public static ByteBuf newReachedEndOfTopic(long consumerId) {
768769

769770
public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) {
770771
BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED);
771-
cmd.setTopicMigrated()
772-
.setResourceType(type)
773-
.setResourceId(resourceId)
774-
.setBrokerServiceUrl(brokerUrl)
775-
.setBrokerServiceUrlTls(brokerUrlTls);
772+
CommandTopicMigrated migratedCmd = cmd.setTopicMigrated();
773+
migratedCmd.setResourceType(type).setResourceId(resourceId);
774+
if (StringUtils.isNotBlank(brokerUrl)) {
775+
migratedCmd.setBrokerServiceUrl(brokerUrl);
776+
}
777+
if (StringUtils.isNotBlank(brokerUrlTls)) {
778+
migratedCmd.setBrokerServiceUrlTls(brokerUrlTls);
779+
}
776780
return serializeWithSize(cmd);
777781
}
778782

0 commit comments

Comments
 (0)