Skip to content

Commit fa6af43

Browse files
[feat][proxy] PIP 97: Implement for ProxyConnection (#19292)
PIP: #12105 ### Motivation Implement asynchronous auth for the proxy connection. This is one of the core PRs for implementing #12105. ### Modifications * Update `ProxyConnection` class to asynchronously handle the authentication result. The result is handled on the handler's event loop to ensure correctness. * Update `ProxyAuthenticationTest` class to implement async auth methods and to make authentication asynchronous to test that code path. ### Verifying this change There is an updated test, but it doesn't cover all code paths in this PR. ### Documentation - [x] `doc-not-needed` We do not need to document this portion of PIP 97. ### Matching PR in forked repository PR in forked repository: michaeljmarshall#16
1 parent 0273f31 commit fa6af43

File tree

2 files changed

+61
-33
lines changed

2 files changed

+61
-33
lines changed

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java

+54-30
Original file line numberDiff line numberDiff line change
@@ -311,13 +311,9 @@ protected static boolean isTlsChannel(Channel channel) {
311311
return channel.pipeline().get(ServiceChannelInitializer.TLS_HANDLER) != null;
312312
}
313313

314-
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
314+
private synchronized void completeConnect() throws PulsarClientException {
315315
Supplier<ClientCnx> clientCnxSupplier;
316316
if (service.getConfiguration().isAuthenticationEnabled()) {
317-
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
318-
this.clientAuthData = clientData;
319-
this.clientAuthMethod = authMethod;
320-
}
321317
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
322318
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
323319
service.getConfiguration().isForwardAuthorizationCredentials(), this);
@@ -423,29 +419,51 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
423419
// According to auth result, send newConnected or newAuthChallenge command.
424420
private void doAuthentication(AuthData clientData)
425421
throws Exception {
426-
AuthData brokerData = authState.authenticate(clientData);
427-
// authentication has completed, will send newConnected command.
428-
if (authState.isComplete()) {
429-
clientAuthRole = authState.getAuthRole();
430-
if (LOG.isDebugEnabled()) {
431-
LOG.debug("[{}] Client successfully authenticated with {} role {}",
432-
remoteAddress, authMethod, clientAuthRole);
433-
}
422+
authState
423+
.authenticateAsync(clientData)
424+
.whenCompleteAsync((authChallenge, throwable) -> {
425+
if (throwable == null) {
426+
authChallengeSuccessCallback(authChallenge);
427+
} else {
428+
authenticationFailedCallback(throwable);
429+
}
430+
}, ctx.executor());
431+
}
432+
433+
protected void authenticationFailedCallback(Throwable t) {
434+
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
435+
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
436+
writeAndFlushAndClose(msg);
437+
}
434438

435-
// First connection
436-
if (this.connectionPool == null || state == State.Connecting) {
437-
// authentication has completed, will send newConnected command.
438-
completeConnect(clientData);
439+
// Always run in this class's event loop.
440+
protected void authChallengeSuccessCallback(AuthData authChallenge) {
441+
try {
442+
// authentication has completed, will send newConnected command.
443+
if (authChallenge == null) {
444+
clientAuthRole = authState.getAuthRole();
445+
if (LOG.isDebugEnabled()) {
446+
LOG.debug("[{}] Client successfully authenticated with {} role {}",
447+
remoteAddress, authMethod, clientAuthRole);
448+
}
449+
450+
// First connection
451+
if (this.connectionPool == null || state == State.Connecting) {
452+
// authentication has completed, will send newConnected command.
453+
completeConnect();
454+
}
455+
return;
439456
}
440-
return;
441-
}
442457

443-
// auth not complete, continue auth with client side.
444-
final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise);
445-
writeAndFlush(msg);
446-
if (LOG.isDebugEnabled()) {
447-
LOG.debug("[{}] Authentication in progress client by method {}.",
448-
remoteAddress, authMethod);
458+
// auth not complete, continue auth with client side.
459+
final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise);
460+
writeAndFlush(msg);
461+
if (LOG.isDebugEnabled()) {
462+
LOG.debug("[{}] Authentication in progress client by method {}.",
463+
remoteAddress, authMethod);
464+
}
465+
} catch (Exception e) {
466+
authenticationFailedCallback(e);
449467
}
450468
}
451469

@@ -479,7 +497,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
479497

480498
// authn not enabled, complete
481499
if (!service.getConfiguration().isAuthenticationEnabled()) {
482-
completeConnect(null);
500+
completeConnect();
483501
return;
484502
}
485503

@@ -493,6 +511,14 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
493511
authMethod = "none";
494512
}
495513

514+
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
515+
// We store the first clientData here. Before this commit, we stored the last clientData.
516+
// Since this only works when forwarding single staged authentication, first == last is true.
517+
// Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291.
518+
this.clientAuthData = clientData;
519+
this.clientAuthMethod = authMethod;
520+
}
521+
496522
authenticationProvider = service
497523
.getAuthenticationService()
498524
.getAuthenticationProvider(authMethod);
@@ -504,7 +530,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
504530
.orElseThrow(() ->
505531
new AuthenticationException("No anonymous role, and no authentication provider configured"));
506532

507-
completeConnect(clientData);
533+
completeConnect();
508534
return;
509535
}
510536

@@ -518,9 +544,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
518544
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
519545
doAuthentication(clientData);
520546
} catch (Exception e) {
521-
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
522-
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
523-
writeAndFlushAndClose(msg);
547+
authenticationFailedCallback(e);
524548
}
525549
}
526550

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map.Entry;
3232
import java.util.Optional;
3333
import java.util.Set;
34+
import java.util.concurrent.CompletableFuture;
3435

3536
import javax.naming.AuthenticationException;
3637

@@ -136,7 +137,7 @@ public String getAuthMethodName() {
136137
}
137138

138139
@Override
139-
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
140+
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
140141
String commandData = null;
141142
if (authData.hasDataFromCommand()) {
142143
commandData = authData.getCommandData();
@@ -150,9 +151,12 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
150151
long currentTimeInMillis = System.currentTimeMillis();
151152
if (expiryTimeInMillis < currentTimeInMillis) {
152153
log.warn("Auth failed due to timeout");
153-
throw new AuthenticationException("Authentication data has been expired");
154+
return CompletableFuture
155+
.failedFuture(new AuthenticationException("Authentication data has been expired"));
154156
}
155-
return element.get("entityType").getAsString();
157+
final String result = element.get("entityType").getAsString();
158+
// Run in another thread to attempt to test the async logic
159+
return CompletableFuture.supplyAsync(() -> result);
156160
}
157161
}
158162

0 commit comments

Comments
 (0)