Skip to content

Commit c952f3c

Browse files
authored
[fix][proxy] Fix refresh client auth (#17831)
* [fix][proxy] Fix refresh client auth Signed-off-by: Zixuan Liu <nodeces@gmail.com> * Fix style Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent df5e0e1 commit c952f3c

File tree

6 files changed

+338
-29
lines changed

6 files changed

+338
-29
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
public class ClientCnx extends PulsarHandler {
105105

106106
protected final Authentication authentication;
107-
private State state;
107+
protected State state;
108108

109109
@Getter
110110
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
@@ -155,7 +155,7 @@ public class ClientCnx extends PulsarHandler {
155155

156156
private final int maxNumberOfRejectedRequestPerConnection;
157157
private final int rejectedRequestResetTimeSec = 60;
158-
private final int protocolVersion;
158+
protected final int protocolVersion;
159159
private final long operationTimeoutMs;
160160

161161
protected String proxyToTargetBrokerAddress = null;
@@ -176,7 +176,10 @@ public class ClientCnx extends PulsarHandler {
176176
@Getter
177177
private final ClientCnxIdleState idleState;
178178

179-
enum State {
179+
@Getter
180+
private long lastDisconnectedTimestamp;
181+
182+
protected enum State {
180183
None, SentConnectFrame, Ready, Failed, Connecting
181184
}
182185

@@ -281,6 +284,7 @@ protected ByteBuf newConnectCommand() throws Exception {
281284
@Override
282285
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
283286
super.channelInactive(ctx);
287+
lastDisconnectedTimestamp = System.currentTimeMillis();
284288
log.info("{} Disconnected", ctx.channel());
285289
if (!connectionFuture.isDone()) {
286290
connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
@@ -1243,6 +1247,13 @@ public void close() {
12431247
}
12441248
}
12451249

1250+
protected void closeWithException(Throwable e) {
1251+
if (ctx != null) {
1252+
connectionFuture.completeExceptionally(e);
1253+
ctx.close();
1254+
}
1255+
}
1256+
12461257
private void checkRequestTimeout() {
12471258
while (!requestTimeoutQueue.isEmpty()) {
12481259
RequestTime request = requestTimeoutQueue.peek();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,19 @@
3535
import java.net.URI;
3636
import java.net.URISyntaxException;
3737
import java.util.ArrayList;
38+
import java.util.Collections;
3839
import java.util.Iterator;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Optional;
4243
import java.util.Random;
44+
import java.util.Set;
4345
import java.util.concurrent.CompletableFuture;
4446
import java.util.concurrent.ConcurrentHashMap;
4547
import java.util.concurrent.ConcurrentMap;
4648
import java.util.concurrent.TimeUnit;
4749
import java.util.function.Supplier;
50+
import java.util.stream.Collectors;
4851
import org.apache.commons.lang3.StringUtils;
4952
import org.apache.pulsar.client.api.PulsarClientException;
5053
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -453,5 +456,9 @@ public void doMarkAndReleaseUselessConnections(){
453456
// Do release idle connections.
454457
releaseIdleConnectionTaskList.forEach(Runnable::run);
455458
}
456-
}
457459

460+
public Set<CompletableFuture<ClientCnx>> getConnections() {
461+
return Collections.unmodifiableSet(
462+
pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
463+
}
464+
}

pulsar-proxy/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@
180180
<artifactId>ipaddress</artifactId>
181181
<version>${seancfoley.ipaddress.version}</version>
182182
</dependency>
183+
<dependency>
184+
<groupId>org.awaitility</groupId>
185+
<artifactId>awaitility</artifactId>
186+
<scope>test</scope>
187+
</dependency>
183188
</dependencies>
184189
<build>
185190
<plugins>

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java

+63-14
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,35 @@
1818
*/
1919
package org.apache.pulsar.proxy.server;
2020

21+
import static com.google.common.base.Preconditions.checkArgument;
2122
import io.netty.buffer.ByteBuf;
2223
import io.netty.channel.EventLoopGroup;
24+
import java.util.Arrays;
25+
import lombok.extern.slf4j.Slf4j;
2326
import org.apache.pulsar.PulsarVersion;
2427
import org.apache.pulsar.client.impl.ClientCnx;
2528
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
2629
import org.apache.pulsar.common.api.AuthData;
30+
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
2731
import org.apache.pulsar.common.protocol.Commands;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3032

33+
@Slf4j
3134
public class ProxyClientCnx extends ClientCnx {
32-
33-
String clientAuthRole;
34-
AuthData clientAuthData;
35-
String clientAuthMethod;
36-
int protocolVersion;
35+
private final boolean forwardClientAuthData;
36+
private final String clientAuthMethod;
37+
private final String clientAuthRole;
38+
private final AuthData clientAuthData;
39+
private final ProxyConnection proxyConnection;
3740

3841
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
39-
AuthData clientAuthData, String clientAuthMethod, int protocolVersion) {
40-
super(conf, eventLoopGroup);
42+
AuthData clientAuthData, String clientAuthMethod, int protocolVersion,
43+
boolean forwardClientAuthData, ProxyConnection proxyConnection) {
44+
super(conf, eventLoopGroup, protocolVersion);
4145
this.clientAuthRole = clientAuthRole;
4246
this.clientAuthData = clientAuthData;
4347
this.clientAuthMethod = clientAuthMethod;
44-
this.protocolVersion = protocolVersion;
48+
this.forwardClientAuthData = forwardClientAuthData;
49+
this.proxyConnection = proxyConnection;
4550
}
4651

4752
@Override
@@ -54,10 +59,54 @@ protected ByteBuf newConnectCommand() throws Exception {
5459

5560
authenticationDataProvider = authentication.getAuthData(remoteHostName);
5661
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
57-
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
58-
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
59-
clientAuthMethod);
62+
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
63+
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
64+
clientAuthMethod);
6065
}
6166

62-
private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class);
67+
@Override
68+
protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
69+
checkArgument(authChallenge.hasChallenge());
70+
checkArgument(authChallenge.getChallenge().hasAuthData());
71+
72+
boolean isRefresh = Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData());
73+
if (!forwardClientAuthData || !isRefresh) {
74+
super.handleAuthChallenge(authChallenge);
75+
return;
76+
}
77+
78+
try {
79+
if (log.isDebugEnabled()) {
80+
log.debug("Proxy {} request to refresh the original client authentication data for "
81+
+ "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
82+
}
83+
84+
proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
85+
protocolVersion))
86+
.addListener(writeFuture -> {
87+
if (writeFuture.isSuccess()) {
88+
if (log.isDebugEnabled()) {
89+
log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
90+
+ "with method {} for the proxy client {}",
91+
proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
92+
}
93+
} else {
94+
log.error("Failed to send the auth challenge to original client by the proxy {} "
95+
+ "for the proxy client {}",
96+
proxyConnection.ctx().channel(),
97+
ctx.channel(),
98+
writeFuture.cause());
99+
closeWithException(writeFuture.cause());
100+
}
101+
});
102+
103+
if (state == State.SentConnectFrame) {
104+
state = State.Connecting;
105+
}
106+
} catch (Exception e) {
107+
log.error("Failed to send the auth challenge to origin client by the proxy {} for the proxy client {}",
108+
proxyConnection.ctx().channel(), ctx.channel(), e);
109+
closeWithException(e);
110+
}
111+
}
63112
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java

+62-11
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import javax.naming.AuthenticationException;
4646
import javax.net.ssl.SSLSession;
4747
import lombok.Getter;
48+
import org.apache.pulsar.PulsarVersion;
4849
import org.apache.pulsar.broker.PulsarServerException;
4950
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
5051
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -316,12 +317,11 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie
316317
this.clientAuthData = clientData;
317318
this.clientAuthMethod = authMethod;
318319
}
319-
clientCnxSupplier =
320-
() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
321-
clientAuthMethod, protocolVersionToAdvertise);
320+
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
321+
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
322+
service.getConfiguration().isForwardAuthorizationCredentials(), this);
322323
} else {
323-
clientCnxSupplier =
324-
() -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
324+
clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
325325
}
326326

327327
if (this.connectionPool == null) {
@@ -423,16 +423,22 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
423423
}
424424

425425
// According to auth result, send newConnected or newAuthChallenge command.
426-
private void doAuthentication(AuthData clientData) throws Exception {
426+
private void doAuthentication(AuthData clientData)
427+
throws Exception {
427428
AuthData brokerData = authState.authenticate(clientData);
428429
// authentication has completed, will send newConnected command.
429430
if (authState.isComplete()) {
430431
clientAuthRole = authState.getAuthRole();
431432
if (LOG.isDebugEnabled()) {
432433
LOG.debug("[{}] Client successfully authenticated with {} role {}",
433-
remoteAddress, authMethod, clientAuthRole);
434+
remoteAddress, authMethod, clientAuthRole);
435+
}
436+
437+
// First connection
438+
if (this.connectionPool == null || state == State.Connecting) {
439+
// authentication has completed, will send newConnected command.
440+
completeConnect(clientData);
434441
}
435-
completeConnect(clientData);
436442
return;
437443
}
438444

@@ -441,7 +447,7 @@ private void doAuthentication(AuthData clientData) throws Exception {
441447
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
442448
if (LOG.isDebugEnabled()) {
443449
LOG.debug("[{}] Authentication in progress client by method {}.",
444-
remoteAddress, authMethod);
450+
remoteAddress, authMethod);
445451
}
446452
state = State.Connecting;
447453
}
@@ -523,18 +529,63 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
523529

524530
@Override
525531
protected void handleAuthResponse(CommandAuthResponse authResponse) {
526-
checkArgument(state == State.Connecting);
527532
checkArgument(authResponse.hasResponse());
528533
checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
529534

530535
if (LOG.isDebugEnabled()) {
531536
LOG.debug("Received AuthResponse from {}, auth method: {}",
532-
remoteAddress, authResponse.getResponse().getAuthMethodName());
537+
remoteAddress, authResponse.getResponse().getAuthMethodName());
533538
}
534539

535540
try {
536541
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
537542
doAuthentication(clientData);
543+
if (service.getConfiguration().isForwardAuthorizationCredentials()
544+
&& connectionPool != null && state == State.ProxyLookupRequests) {
545+
connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
546+
String clientVersion;
547+
if (authResponse.hasClientVersion()) {
548+
clientVersion = authResponse.getClientVersion();
549+
} else {
550+
clientVersion = PulsarVersion.getVersion();
551+
}
552+
int protocolVersion;
553+
if (authResponse.hasProtocolVersion()) {
554+
protocolVersion = authResponse.getProtocolVersion();
555+
} else {
556+
protocolVersion = Commands.getCurrentProtocolVersion();
557+
}
558+
559+
ByteBuf cmd =
560+
Commands.newAuthResponse(clientAuthMethod, clientData, protocolVersion, clientVersion);
561+
toBrokerCnxFuture.thenAccept(toBrokerCnx -> toBrokerCnx.ctx().writeAndFlush(cmd)
562+
.addListener(writeFuture -> {
563+
if (writeFuture.isSuccess()) {
564+
if (LOG.isDebugEnabled()) {
565+
LOG.debug("{} authentication is refreshed successfully by {}, "
566+
+ "auth method: {} ",
567+
toBrokerCnx.ctx().channel(), ctx.channel(), clientAuthMethod);
568+
}
569+
} else {
570+
LOG.error("Failed to forward the auth response "
571+
+ "from the proxy to the broker through the proxy client, "
572+
+ "proxy: {}, proxy client: {}",
573+
ctx.channel(),
574+
toBrokerCnx.ctx().channel(),
575+
writeFuture.cause());
576+
toBrokerCnx.ctx().channel().pipeline()
577+
.fireExceptionCaught(writeFuture.cause());
578+
}
579+
}))
580+
.whenComplete((__, ex) -> {
581+
if (ex != null) {
582+
LOG.error("Failed to forward the auth response from the proxy to "
583+
+ "the broker through the proxy client, proxy: {}",
584+
ctx().channel(), ex);
585+
}
586+
});
587+
});
588+
}
538589
} catch (Exception e) {
539590
String msg = "Unable to handleAuthResponse";
540591
LOG.warn("[{}] {} ", remoteAddress, msg, e);

0 commit comments

Comments
 (0)