Skip to content

Commit e79c85d

Browse files
committed
Fix
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 08df28a commit e79c85d

File tree

6 files changed

+344
-27
lines changed

6 files changed

+344
-27
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
public class ClientCnx extends PulsarHandler {
105105

106106
protected final Authentication authentication;
107-
private State state;
107+
protected State state;
108108

109109
@Getter
110110
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
@@ -155,7 +155,7 @@ public class ClientCnx extends PulsarHandler {
155155

156156
private final int maxNumberOfRejectedRequestPerConnection;
157157
private final int rejectedRequestResetTimeSec = 60;
158-
private final int protocolVersion;
158+
protected final int protocolVersion;
159159
private final long operationTimeoutMs;
160160

161161
protected String proxyToTargetBrokerAddress = null;
@@ -176,7 +176,10 @@ public class ClientCnx extends PulsarHandler {
176176
@Getter
177177
private final ClientCnxIdleState idleState;
178178

179-
enum State {
179+
@Getter
180+
private long lastDisconnectedTimestamp;
181+
182+
protected enum State {
180183
None, SentConnectFrame, Ready, Failed, Connecting
181184
}
182185

@@ -281,6 +284,7 @@ protected ByteBuf newConnectCommand() throws Exception {
281284
@Override
282285
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
283286
super.channelInactive(ctx);
287+
lastDisconnectedTimestamp = System.currentTimeMillis();
284288
log.info("{} Disconnected", ctx.channel());
285289
if (!connectionFuture.isDone()) {
286290
connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
@@ -1243,6 +1247,13 @@ public void close() {
12431247
}
12441248
}
12451249

1250+
protected void closeWithException(Throwable e) {
1251+
if (ctx != null) {
1252+
connectionFuture.completeExceptionally(e);
1253+
ctx.close();
1254+
}
1255+
}
1256+
12461257
private void checkRequestTimeout() {
12471258
while (!requestTimeoutQueue.isEmpty()) {
12481259
RequestTime request = requestTimeoutQueue.peek();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,19 @@
3535
import java.net.URI;
3636
import java.net.URISyntaxException;
3737
import java.util.ArrayList;
38+
import java.util.Collections;
3839
import java.util.Iterator;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Optional;
4243
import java.util.Random;
44+
import java.util.Set;
4345
import java.util.concurrent.CompletableFuture;
4446
import java.util.concurrent.ConcurrentHashMap;
4547
import java.util.concurrent.ConcurrentMap;
4648
import java.util.concurrent.TimeUnit;
4749
import java.util.function.Supplier;
50+
import java.util.stream.Collectors;
4851
import org.apache.commons.lang3.StringUtils;
4952
import org.apache.pulsar.client.api.PulsarClientException;
5053
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -453,5 +456,9 @@ public void doMarkAndReleaseUselessConnections(){
453456
// Do release idle connections.
454457
releaseIdleConnectionTaskList.forEach(Runnable::run);
455458
}
456-
}
457459

460+
public Set<CompletableFuture<ClientCnx>> getConnections() {
461+
return Collections.unmodifiableSet(
462+
pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
463+
}
464+
}

pulsar-proxy/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@
174174
<artifactId>ipaddress</artifactId>
175175
<version>${seancfoley.ipaddress.version}</version>
176176
</dependency>
177+
<dependency>
178+
<groupId>org.awaitility</groupId>
179+
<artifactId>awaitility</artifactId>
180+
<scope>test</scope>
181+
</dependency>
177182
</dependencies>
178183
<build>
179184
<plugins>

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java

+48-13
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,42 @@
1818
*/
1919
package org.apache.pulsar.proxy.server;
2020

21+
import static com.google.common.base.Preconditions.checkArgument;
2122
import io.netty.buffer.ByteBuf;
2223
import io.netty.channel.EventLoopGroup;
24+
import java.util.Arrays;
25+
import lombok.extern.slf4j.Slf4j;
2326
import org.apache.pulsar.PulsarVersion;
2427
import org.apache.pulsar.client.impl.ClientCnx;
2528
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
2629
import org.apache.pulsar.common.api.AuthData;
30+
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
2731
import org.apache.pulsar.common.protocol.Commands;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3032

33+
@Slf4j
3134
public class ProxyClientCnx extends ClientCnx {
35+
private final boolean forwardClientAuthData;
36+
private final String clientAuthMethod;
37+
private final String clientAuthRole;
38+
private final AuthData clientAuthData;
39+
private final Runnable refreshClientAuthDataNotifier;
3240

33-
String clientAuthRole;
34-
AuthData clientAuthData;
35-
String clientAuthMethod;
36-
int protocolVersion;
37-
41+
@Deprecated
3842
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
3943
AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
40-
super(conf, eventLoopGroup);
44+
this(conf, eventLoopGroup, clientAuthRole, clientAuthData, null,
45+
clientAuthMethod, protocolVersion, clientAuthData != null);
46+
}
47+
48+
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
49+
AuthData clientAuthData, Runnable refreshClientAuthDataNotifier,
50+
String clientAuthMethod, int protocolVersion, boolean forwardClientAuthData) {
51+
super(conf, eventLoopGroup, protocolVersion);
4152
this.clientAuthRole = clientAuthRole;
4253
this.clientAuthData = clientAuthData;
54+
this.refreshClientAuthDataNotifier = refreshClientAuthDataNotifier;
4355
this.clientAuthMethod = clientAuthMethod;
44-
this.protocolVersion = protocolVersion;
56+
this.forwardClientAuthData = forwardClientAuthData;
4557
}
4658

4759
@Override
@@ -54,10 +66,33 @@ protected ByteBuf newConnectCommand() throws Exception {
5466

5567
authenticationDataProvider = authentication.getAuthData(remoteHostName);
5668
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
57-
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
58-
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
59-
clientAuthMethod);
69+
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
70+
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
71+
clientAuthMethod);
6072
}
6173

62-
private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
74+
@Override
75+
protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
76+
checkArgument(authChallenge.hasChallenge());
77+
checkArgument(authChallenge.getChallenge().hasAuthData());
78+
79+
boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
80+
if (!forwardClientAuthData || !isRefresh || refreshClientAuthDataNotifier == null) {
81+
super.handleAuthChallenge(authChallenge);
82+
return;
83+
}
84+
85+
try {
86+
if (log.isDebugEnabled()) {
87+
log.debug("{} Request to refresh the original client authentication data", ctx.channel());
88+
}
89+
refreshClientAuthDataNotifier.run();
90+
if (state == State.SentConnectFrame) {
91+
state = State.Connecting;
92+
}
93+
} catch (Exception e) {
94+
log.error("{} Failed to send the auth challenge command: {}", ctx.channel(), e);
95+
closeWithException(e);
96+
}
97+
}
6398
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java

+81-10
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import javax.naming.AuthenticationException;
4646
import javax.net.ssl.SSLSession;
4747
import lombok.Getter;
48+
import org.apache.pulsar.PulsarVersion;
4849
import org.apache.pulsar.broker.PulsarServerException;
4950
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
5051
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -316,12 +317,13 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
316317
this.clientAuthData = clientData;
317318
this.clientAuthMethod = authMethod;
318319
}
319-
clientCnxSupplier =
320-
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
321-
clientAuthMethod, protocolVersionToAdvertise);
320+
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
321+
clientAuthData,
322+
this::requestRefreshClientAuthData,
323+
clientAuthMethod, protocolVersionToAdvertise,
324+
service.getConfiguration().isForwardAuthorizationCredentials());
322325
} else {
323-
clientCnxSupplier =
324-
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
326+
clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
325327
}
326328

327329
if (this.connectionPool == null) {
@@ -423,16 +425,22 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
423425
}
424426

425427
// According to auth result, send newConnected or newAuthChallenge command.
426-
private void doAuthentication(AuthData clientData) throws Exception {
428+
private void doAuthentication(AuthData clientData)
429+
throws Exception {
427430
AuthData brokerData = authState.authenticate(clientData);
428431
// authentication has completed, will send newConnected command.
429432
if (authState.isComplete()) {
430433
clientAuthRole = authState.getAuthRole();
431434
if (LOG.isDebugEnabled()) {
432435
LOG.debug("[{}] Client successfully authenticated with {} role {}",
433-
remoteAddress, authMethod, clientAuthRole);
436+
remoteAddress, authMethod, clientAuthRole);
437+
}
438+
439+
// First connection
440+
if (this.connectionPool == null || state == State.Connecting) {
441+
// authentication has completed, will send newConnected command.
442+
completeConnect(clientData);
434443
}
435-
completeConnect(clientData);
436444
return;
437445
}
438446

@@ -441,7 +449,7 @@ private void doAuthentication(AuthData clientData) throws Exception {
441449
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
442450
if (LOG.isDebugEnabled()) {
443451
LOG.debug("[{}] Authentication in progress client by method {}.",
444-
remoteAddress, authMethod);
452+
remoteAddress, authMethod);
445453
}
446454
state = State.Connecting;
447455
}
@@ -523,7 +531,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
523531

524532
@Override
525533
protected void handleAuthResponse(CommandAuthResponse authResponse) {
526-
checkArgument(state == State.Connecting);
527534
checkArgument(authResponse.hasResponse());
528535
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
529536

@@ -535,6 +542,53 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
535542
try {
536543
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
537544
doAuthentication(clientData);
545+
if (connectionPool != null && state == State.ProxyLookupRequests) {
546+
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
547+
connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
548+
String clientVersion;
549+
if (authResponse.hasClientVersion()) {
550+
clientVersion = authResponse.getClientVersion();
551+
} else {
552+
clientVersion = PulsarVersion.getVersion();
553+
}
554+
int protocolVersion;
555+
if (authResponse.hasProtocolVersion()) {
556+
protocolVersion = authResponse.getProtocolVersion();
557+
} else {
558+
protocolVersion = Commands.getCurrentProtocolVersion();
559+
}
560+
561+
ByteBuf cmd =
562+
Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
563+
toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd)
564+
.addListener(writeFuture -> {
565+
if (writeFuture.isSuccess()) {
566+
if (LOG.isDebugEnabled()) {
567+
LOG.debug("{} authentication is refreshed successfully by {}" +
568+
", auth method: {} ",
569+
toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
570+
}
571+
} else {
572+
LOG.error("Failed to forward the auth response command "
573+
+ "from the proxy to the broker through the proxy client, "
574+
+ "proxy: {}, proxy client: {}",
575+
ctx.channel(),
576+
toBrokerCnx.ctx().channel(),
577+
writeFuture.cause());
578+
toBrokerCnx.ctx().channel().pipeline().fireExceptionCaught(writeFuture.cause());
579+
}
580+
}))
581+
.whenComplete((__, ex) -> {
582+
if (ex != null) {
583+
LOG.error("Failed to forward the auth response command "
584+
+ "from the proxy to the broker through the proxy client, "
585+
+ "proxy: {}",
586+
ctx().channel(), ex);
587+
}
588+
});
589+
});
590+
}
591+
}
538592
} catch (Exception e) {
539593
String msg = "Unable to handleAuthResponse";
540594
LOG.warn("[{}] {} ", remoteAddress, msg, e);
@@ -543,6 +597,23 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
543597
}
544598
}
545599

600+
private void requestRefreshClientAuthData() {
601+
ctx.writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
602+
protocolVersionToAdvertise))
603+
.addListener(writeFuture -> {
604+
if (writeFuture.isSuccess()) {
605+
if (LOG.isDebugEnabled()) {
606+
LOG.debug("{} Sent auth challenge to client to refresh credentials with method: {}",
607+
ctx.channel(), clientAuthMethod);
608+
}
609+
} else {
610+
LOG.error("{} Failed to send request for mutual auth to client", ctx.channel(),
611+
writeFuture.cause());
612+
ctx.channel().pipeline().fireExceptionCaught(writeFuture.cause());
613+
}
614+
});
615+
}
616+
546617
@Override
547618
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
548619
checkArgument(state == State.ProxyLookupRequests);

0 commit comments

Comments
 (0)