Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3ddeb0c

Browse files
committedSep 29, 2022
[fix][proxy] Fix refresh client auth
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 08df28a commit 3ddeb0c

File tree

6 files changed

+343
-27
lines changed

6 files changed

+343
-27
lines changed
 

‎pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
public class ClientCnx extends PulsarHandler {
105105

106106
protected final Authentication authentication;
107-
private State state;
107+
protected State state;
108108

109109
@Getter
110110
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
@@ -155,7 +155,7 @@ public class ClientCnx extends PulsarHandler {
155155

156156
private final int maxNumberOfRejectedRequestPerConnection;
157157
private final int rejectedRequestResetTimeSec = 60;
158-
private final int protocolVersion;
158+
protected final int protocolVersion;
159159
private final long operationTimeoutMs;
160160

161161
protected String proxyToTargetBrokerAddress = null;
@@ -176,7 +176,10 @@ public class ClientCnx extends PulsarHandler {
176176
@Getter
177177
private final ClientCnxIdleState idleState;
178178

179-
enum State {
179+
@Getter
180+
private long lastDisconnectedTimestamp;
181+
182+
protected enum State {
180183
None, SentConnectFrame, Ready, Failed, Connecting
181184
}
182185

@@ -281,6 +284,7 @@ protected ByteBuf newConnectCommand() throws Exception {
281284
@Override
282285
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
283286
super.channelInactive(ctx);
287+
lastDisconnectedTimestamp = System.currentTimeMillis();
284288
log.info("{} Disconnected", ctx.channel());
285289
if (!connectionFuture.isDone()) {
286290
connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
@@ -1243,6 +1247,13 @@ public void close() {
12431247
}
12441248
}
12451249

1250+
protected void closeWithException(Throwable e) {
1251+
if (ctx != null) {
1252+
connectionFuture.completeExceptionally(e);
1253+
ctx.close();
1254+
}
1255+
}
1256+
12461257
private void checkRequestTimeout() {
12471258
while (!requestTimeoutQueue.isEmpty()) {
12481259
RequestTime request = requestTimeoutQueue.peek();

‎pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,19 @@
3535
import java.net.URI;
3636
import java.net.URISyntaxException;
3737
import java.util.ArrayList;
38+
import java.util.Collections;
3839
import java.util.Iterator;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Optional;
4243
import java.util.Random;
44+
import java.util.Set;
4345
import java.util.concurrent.CompletableFuture;
4446
import java.util.concurrent.ConcurrentHashMap;
4547
import java.util.concurrent.ConcurrentMap;
4648
import java.util.concurrent.TimeUnit;
4749
import java.util.function.Supplier;
50+
import java.util.stream.Collectors;
4851
import org.apache.commons.lang3.StringUtils;
4952
import org.apache.pulsar.client.api.PulsarClientException;
5053
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -453,5 +456,9 @@ public void doMarkAndReleaseUselessConnections(){
453456
// Do release idle connections.
454457
releaseIdleConnectionTaskList.forEach(Runnable::run);
455458
}
456-
}
457459

460+
public Set<CompletableFuture<ClientCnx>> getConnections() {
461+
return Collections.unmodifiableSet(
462+
pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
463+
}
464+
}

‎pulsar-proxy/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@
174174
<artifactId>ipaddress</artifactId>
175175
<version>${seancfoley.ipaddress.version}</version>
176176
</dependency>
177+
<dependency>
178+
<groupId>org.awaitility</groupId>
179+
<artifactId>awaitility</artifactId>
180+
<scope>test</scope>
181+
</dependency>
177182
</dependencies>
178183
<build>
179184
<plugins>

‎pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java

+48-13
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,42 @@
1818
*/
1919
package org.apache.pulsar.proxy.server;
2020

21+
import static com.google.common.base.Preconditions.checkArgument;
2122
import io.netty.buffer.ByteBuf;
2223
import io.netty.channel.EventLoopGroup;
24+
import java.util.Arrays;
25+
import lombok.extern.slf4j.Slf4j;
2326
import org.apache.pulsar.PulsarVersion;
2427
import org.apache.pulsar.client.impl.ClientCnx;
2528
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
2629
import org.apache.pulsar.common.api.AuthData;
30+
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
2731
import org.apache.pulsar.common.protocol.Commands;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3032

33+
@Slf4j
3134
public class ProxyClientCnx extends ClientCnx {
35+
private final boolean forwardClientAuthData;
36+
private final String clientAuthMethod;
37+
private final String clientAuthRole;
38+
private final AuthData clientAuthData;
39+
private final Runnable refreshClientAuthDataNotifier;
3240

33-
String clientAuthRole;
34-
AuthData clientAuthData;
35-
String clientAuthMethod;
36-
int protocolVersion;
37-
41+
@Deprecated
3842
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
3943
AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
40-
super(conf, eventLoopGroup);
44+
this(conf, eventLoopGroup, clientAuthRole, clientAuthData, null,
45+
clientAuthMethod, protocolVersion, clientAuthData != null);
46+
}
47+
48+
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
49+
AuthData clientAuthData, Runnable refreshClientAuthDataNotifier,
50+
String clientAuthMethod, int protocolVersion, boolean forwardClientAuthData) {
51+
super(conf, eventLoopGroup, protocolVersion);
4152
this.clientAuthRole = clientAuthRole;
4253
this.clientAuthData = clientAuthData;
54+
this.refreshClientAuthDataNotifier = refreshClientAuthDataNotifier;
4355
this.clientAuthMethod = clientAuthMethod;
44-
this.protocolVersion = protocolVersion;
56+
this.forwardClientAuthData = forwardClientAuthData;
4557
}
4658

4759
@Override
@@ -54,10 +66,33 @@ protected ByteBuf newConnectCommand() throws Exception {
5466

5567
authenticationDataProvider = authentication.getAuthData(remoteHostName);
5668
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
57-
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
58-
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
59-
clientAuthMethod);
69+
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
70+
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
71+
clientAuthMethod);
6072
}
6173

62-
private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
74+
@Override
75+
protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
76+
checkArgument(authChallenge.hasChallenge());
77+
checkArgument(authChallenge.getChallenge().hasAuthData());
78+
79+
boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
80+
if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
81+
super.handleAuthChallenge(authChallenge);
82+
return;
83+
}
84+
85+
try {
86+
if (log.isDebugEnabled()) {
87+
log.debug("{} Request to refresh the original client authentication data", ctx.channel());
88+
}
89+
refreshClientAuthDataNotifier.run();
90+
if (state == State.SentConnectFrame) {
91+
state = State.Connecting;
92+
}
93+
} catch (Exception e) {
94+
log.error("{} Failed to send the auth challenge command: {}", ctx.channel(), e);
95+
closeWithException(e);
96+
}
97+
}
6398
}

‎pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java

+80-10
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import javax.naming.AuthenticationException;
4646
import javax.net.ssl.SSLSession;
4747
import lombok.Getter;
48+
import org.apache.pulsar.PulsarVersion;
4849
import org.apache.pulsar.broker.PulsarServerException;
4950
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
5051
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -316,12 +317,13 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
316317
this.clientAuthData = clientData;
317318
this.clientAuthMethod = authMethod;
318319
}
319-
clientCnxSupplier =
320-
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
321-
clientAuthMethod, protocolVersionToAdvertise);
320+
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
321+
clientAuthData,
322+
this::requestRefreshClientAuthData,
323+
clientAuthMethod, protocolVersionToAdvertise,
324+
service.getConfiguration().isForwardAuthorizationCredentials());
322325
} else {
323-
clientCnxSupplier =
324-
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
326+
clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
325327
}
326328

327329
if (this.connectionPool == null) {
@@ -423,16 +425,22 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
423425
}
424426

425427
// According to auth result, send newConnected or newAuthChallenge command.
426-
private void doAuthentication(AuthData clientData) throws Exception {
428+
private void doAuthentication(AuthData clientData)
429+
throws Exception {
427430
AuthData brokerData = authState.authenticate(clientData);
428431
// authentication has completed, will send newConnected command.
429432
if (authState.isComplete()) {
430433
clientAuthRole = authState.getAuthRole();
431434
if (LOG.isDebugEnabled()) {
432435
LOG.debug("[{}] Client successfully authenticated with {} role {}",
433-
remoteAddress, authMethod, clientAuthRole);
436+
remoteAddress, authMethod, clientAuthRole);
437+
}
438+
439+
// First connection
440+
if (this.connectionPool == null || state == State.Connecting) {
441+
// authentication has completed, will send newConnected command.
442+
completeConnect(clientData);
434443
}
435-
completeConnect(clientData);
436444
return;
437445
}
438446

@@ -441,7 +449,7 @@ private void doAuthentication(AuthData clientData) throws Exception {
441449
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
442450
if (LOG.isDebugEnabled()) {
443451
LOG.debug("[{}] Authentication in progress client by method {}.",
444-
remoteAddress, authMethod);
452+
remoteAddress, authMethod);
445453
}
446454
state = State.Connecting;
447455
}
@@ -523,7 +531,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
523531

524532
@Override
525533
protected void handleAuthResponse(CommandAuthResponse authResponse) {
526-
checkArgument(state == State.Connecting);
527534
checkArgument(authResponse.hasResponse());
528535
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
529536

@@ -535,6 +542,52 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
535542
try {
536543
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
537544
doAuthentication(clientData);
545+
if (service.getConfiguration().isForwardAuthorizationCredentials()
546+
&& connectionPool != null && state == State.ProxyLookupRequests) {
547+
connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
548+
String clientVersion;
549+
if (authResponse.hasClientVersion()) {
550+
clientVersion = authResponse.getClientVersion();
551+
} else {
552+
clientVersion = PulsarVersion.getVersion();
553+
}
554+
int protocolVersion;
555+
if (authResponse.hasProtocolVersion()) {
556+
protocolVersion = authResponse.getProtocolVersion();
557+
} else {
558+
protocolVersion = Commands.getCurrentProtocolVersion();
559+
}
560+
561+
ByteBuf cmd =
562+
Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
563+
toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd)
564+
.addListener(writeFuture -> {
565+
if (writeFuture.isSuccess()) {
566+
if (LOG.isDebugEnabled()) {
567+
LOG.debug("{} authentication is refreshed successfully by {}, "
568+
+ "auth method: {} ",
569+
toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
570+
}
571+
} else {
572+
LOG.error("Failed to forward the auth response command "
573+
+ "from the proxy to the broker through the proxy client, "
574+
+ "proxy: {}, proxy client: {}",
575+
ctx.channel(),
576+
toBrokerCnx.ctx().channel(),
577+
writeFuture.cause());
578+
toBrokerCnx.ctx().channel().pipeline()
579+
.fireExceptionCaught(writeFuture.cause());
580+
}
581+
}))
582+
.whenComplete((__, ex) -> {
583+
if (ex != null) {
584+
LOG.error("Failed to forward the auth response command from the proxy to " +
585+
"the broker through the proxy client, proxy: {}",
586+
ctx().channel(), ex);
587+
}
588+
});
589+
});
590+
}
538591
} catch (Exception e) {
539592
String msg = "Unable to handleAuthResponse";
540593
LOG.warn("[{}] {} ", remoteAddress, msg, e);
@@ -543,6 +596,23 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
543596
}
544597
}
545598

599+
private void requestRefreshClientAuthData() {
600+
ctx.writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
601+
protocolVersionToAdvertise))
602+
.addListener(writeFuture -> {
603+
if (writeFuture.isSuccess()) {
604+
if (LOG.isDebugEnabled()) {
605+
LOG.debug("{} Sent auth challenge to client to refresh credentials with method: {}",
606+
ctx.channel(), clientAuthMethod);
607+
}
608+
} else {
609+
LOG.error("{} Failed to send request for mutual auth to client", ctx.channel(),
610+
writeFuture.cause());
611+
ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());
612+
}
613+
});
614+
}
615+
546616
@Override
547617
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
548618
checkArgument(state == State.ProxyLookupRequests);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.proxy.server;
20+
21+
import static java.util.concurrent.TimeUnit.SECONDS;
22+
import static org.mockito.Mockito.spy;
23+
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.assertTrue;
25+
import com.google.common.collect.Sets;
26+
import io.jsonwebtoken.SignatureAlgorithm;
27+
import java.util.Calendar;
28+
import java.util.Collections;
29+
import java.util.HashSet;
30+
import java.util.Optional;
31+
import java.util.Properties;
32+
import java.util.Set;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.stream.Collectors;
35+
import javax.crypto.SecretKey;
36+
import lombok.Cleanup;
37+
import lombok.extern.slf4j.Slf4j;
38+
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
39+
import org.apache.pulsar.broker.authentication.AuthenticationService;
40+
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
41+
import org.apache.pulsar.client.admin.PulsarAdmin;
42+
import org.apache.pulsar.client.api.Producer;
43+
import org.apache.pulsar.client.api.ProducerConsumerBase;
44+
import org.apache.pulsar.client.api.PulsarClient;
45+
import org.apache.pulsar.client.impl.ClientCnx;
46+
import org.apache.pulsar.client.impl.PulsarClientImpl;
47+
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
48+
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
49+
import org.apache.pulsar.common.policies.data.ClusterData;
50+
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
51+
import org.awaitility.Awaitility;
52+
import org.mockito.Mockito;
53+
import org.testng.annotations.AfterClass;
54+
import org.testng.annotations.BeforeClass;
55+
import org.testng.annotations.DataProvider;
56+
import org.testng.annotations.Test;
57+
58+
@Slf4j
59+
public class ProxyRefreshAuthTest extends ProducerConsumerBase {
60+
private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
61+
62+
private ProxyService proxyService;
63+
private final ProxyConfiguration proxyConfig = new ProxyConfiguration();
64+
65+
@Override
66+
protected void doInitConf() throws Exception {
67+
super.doInitConf();
68+
69+
// enable tls and auth&auth at broker
70+
conf.setAuthenticationEnabled(true);
71+
conf.setAuthorizationEnabled(false);
72+
conf.setTopicLevelPoliciesEnabled(false);
73+
conf.setProxyRoles(Collections.singleton("Proxy"));
74+
conf.setAdvertisedAddress(null);
75+
conf.setAuthenticateOriginalAuthData(true);
76+
conf.setBrokerServicePort(Optional.of(0));
77+
conf.setWebServicePort(Optional.of(0));
78+
79+
Set<String> superUserRoles = new HashSet<>();
80+
superUserRoles.add("superUser");
81+
conf.setSuperUserRoles(superUserRoles);
82+
83+
conf.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
84+
Properties properties = new Properties();
85+
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
86+
conf.setProperties(properties);
87+
88+
conf.setClusterName("proxy-authorization");
89+
conf.setNumExecutorThreadPoolSize(5);
90+
91+
conf.setAuthenticationRefreshCheckSeconds(1);
92+
}
93+
94+
@BeforeClass
95+
@Override
96+
protected void setup() throws Exception {
97+
super.init();
98+
99+
admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress())
100+
.authentication(new AuthenticationToken(
101+
() -> AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.empty()))).build();
102+
String namespaceName = "my-tenant/my-ns";
103+
admin.clusters().createCluster("proxy-authorization",
104+
ClusterData.builder().serviceUrlTls(brokerUrlTls.toString()).build());
105+
admin.tenants().createTenant("my-tenant",
106+
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
107+
admin.namespaces().createNamespace(namespaceName);
108+
109+
// start proxy service
110+
proxyConfig.setAuthenticationEnabled(true);
111+
proxyConfig.setAuthorizationEnabled(false);
112+
proxyConfig.setForwardAuthorizationCredentials(true);
113+
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
114+
proxyConfig.setAdvertisedAddress(null);
115+
116+
proxyConfig.setServicePort(Optional.of(0));
117+
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
118+
proxyConfig.setWebServicePort(Optional.of(0));
119+
120+
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
121+
proxyConfig.setBrokerClientAuthenticationParameters(
122+
AuthTokenUtils.createToken(SECRET_KEY, "Proxy", Optional.empty()));
123+
proxyConfig.setAuthenticationProviders(Set.of(AuthenticationProviderToken.class.getName()));
124+
Properties properties = new Properties();
125+
properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
126+
proxyConfig.setProperties(properties);
127+
128+
proxyService = Mockito.spy(new ProxyService(proxyConfig,
129+
new AuthenticationService(
130+
PulsarConfigurationLoader.convertFrom(proxyConfig))));
131+
}
132+
133+
@AfterClass(alwaysRun = true)
134+
@Override
135+
protected void cleanup() throws Exception {
136+
super.internalCleanup();
137+
proxyService.close();
138+
}
139+
140+
private void startProxy(boolean forwardAuthData) throws Exception {
141+
pulsar.getConfiguration().setAuthenticateOriginalAuthData(forwardAuthData);
142+
proxyConfig.setForwardAuthorizationCredentials(forwardAuthData);
143+
proxyService.start();
144+
}
145+
146+
@DataProvider
147+
Object[] forwardAuthDataProvider() {
148+
return new Object[]{true, false};
149+
}
150+
151+
@Test(dataProvider = "forwardAuthDataProvider")
152+
public void testAuthDataRefresh(boolean forwardAuthData) throws Exception {
153+
log.info("-- Starting {} test --", methodName);
154+
155+
startProxy(forwardAuthData);
156+
157+
AuthenticationToken authenticationToken = new AuthenticationToken(() -> {
158+
Calendar calendar = Calendar.getInstance();
159+
calendar.add(Calendar.SECOND, 1);
160+
return AuthTokenUtils.createToken(SECRET_KEY, "client", Optional.of(calendar.getTime()));
161+
});
162+
163+
pulsarClient = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
164+
.authentication(authenticationToken)
165+
.build();
166+
167+
String topic = "persistent://my-tenant/my-ns/my-topic1";
168+
@Cleanup
169+
Producer<byte[]> ignored = spy(pulsarClient.newProducer()
170+
.topic(topic).create());
171+
172+
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
173+
Set<CompletableFuture<ClientCnx>> connections = pulsarClientImpl.getCnxPool().getConnections();
174+
175+
Awaitility.await().during(4, SECONDS).untilAsserted(() -> {
176+
pulsarClient.getPartitionsForTopic(topic).get();
177+
assertTrue(connections.stream().allMatch(n -> {
178+
try {
179+
ClientCnx clientCnx = n.get();
180+
long timestamp = clientCnx.getLastDisconnectedTimestamp();
181+
return timestamp == 0;
182+
} catch (Exception e) {
183+
throw new RuntimeException(e);
184+
}
185+
}));
186+
});
187+
}
188+
}

0 commit comments

Comments
 (0)
Please sign in to comment.