Skip to content

Commit

Permalink
[fix][broker] Can't connecte to non-persist topic when enable broker …
Browse files Browse the repository at this point in the history
…client tls (apache#22991)
  • Loading branch information
shibd authored Jul 3, 2024
1 parent f4d1d05 commit deb26f7
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,15 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
return FutureUtil.failedFuture(new ServiceUnitNotReadyException(
"No broker was available to own " + topicName));
}
return pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
LookupData lookupData = lookupResult.get().getLookupData();
String brokerUrl;
if (pulsar.getConfiguration().isBrokerClientTlsEnabled()
&& StringUtils.isNotEmpty(lookupData.getBrokerUrlTls())) {
brokerUrl = lookupData.getBrokerUrlTls();
} else {
brokerUrl = lookupData.getBrokerUrl();
}
return pulsarClient.getLookup(brokerUrl)
.getPartitionedTopicMetadata(topicName, false)
.thenApply(metadata -> true)
.exceptionallyCompose(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
Expand All @@ -32,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
import java.nio.charset.StandardCharsets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
Expand Down Expand Up @@ -107,6 +110,7 @@ protected void internalSetUpForBroker() {
conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
conf.setBrokerClientTlsEnabled(true);
conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
}
Expand All @@ -132,6 +136,29 @@ private PulsarAdmin getAdmin(String token) throws Exception {
return clientBuilder.build();
}

@Test
public void testNonPersistentTopic() throws Exception {

@Cleanup
PulsarClient pulsarClient = getClient(ADMIN_TOKEN);

String topic = "non-persistent://" + namespaceName + "/test-token-non-persistent";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test").subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8);
producer.send(msg);

Message<byte[]> receive = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(receive);
assertEquals(receive.getData(), msg);
}

@Test
public void testTokenExpirationProduceConsumer() throws Exception {
Calendar calendar = Calendar.getInstance();
Expand Down

0 comments on commit deb26f7

Please sign in to comment.