Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/pulsar
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: dd7d9c8b2dfe91a4bf89203f2c38385ab5e332b2
Choose a base ref
..
head repository: apache/pulsar
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: be12f6d7215d0c551dd4fd694d565734fae47106
Choose a head ref
Showing with 24 additions and 24 deletions.
  1. +24 −24 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Original file line number Diff line number Diff line change
@@ -678,8 +678,8 @@ private void completeConnect(int clientProtoVersion, String clientVersion, boole
}

// According to auth result, send newConnected or newAuthChallenge command.
private CompletableFuture<Void> doAuthentication(AuthData clientData, int clientProtocolVersion,
String clientVersion) throws AuthenticationException {
private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData, int clientProtocolVersion,
String clientVersion) {
boolean useOriginalAuthState = (originalAuthState != null);
if (state == State.Connected) {
// For auth challenge, the authentication state requires to be updated.
@@ -688,10 +688,15 @@ private CompletableFuture<Void> doAuthentication(AuthData clientData, int client
+ "auth role: {}",
useOriginalAuthState, originalPrincipal, authRole);
}
if (useOriginalAuthState) {
originalAuthState = originalAuthenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
} else {
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
try {
if (useOriginalAuthState) {
originalAuthState =
originalAuthenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
} else {
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
}
} catch (AuthenticationException e) {
return CompletableFuture.failedFuture(e);
}
}

@@ -702,11 +707,9 @@ private CompletableFuture<Void> doAuthentication(AuthData clientData, int client
AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;

CompletableFuture<AuthData> authFuture;
CompletableFuture<AuthData> authFuture = CompletableFuture.completedFuture(null);
if (!authState.isComplete()) {
authFuture = authState.authenticateAsync(clientData);
} else {
authFuture = CompletableFuture.completedFuture(null);
}
return authFuture.thenCompose(nextAuthData -> {
if (nextAuthData == null) {
@@ -894,7 +897,7 @@ protected void handleConnect(CommandConnect connect) {
}

authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
doAuthentication(clientData, clientProtocolVersion, clientVersion).thenCompose(__ -> {
doAuthenticationAsync(clientData, clientProtocolVersion, clientVersion).thenCompose(__ -> {
// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
@@ -960,18 +963,14 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
remoteAddress, authResponse.getResponse().getAuthMethodName());
}

try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData, authResponse.getProtocolVersion(),
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY).whenComplete(
(__, e) -> {
if (e != null) {
closeWithAuthException("authResponse", e);
}
});
} catch (Exception e) {
closeWithAuthException("authResponse", e);
}
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
String clientVersion = authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY;
doAuthenticationAsync(clientData, authResponse.getProtocolVersion(), clientVersion)
.whenComplete((__, e) -> {
if (e != null) {
closeWithAuthException("authResponse", e);
}
});
}

@Override
@@ -3081,9 +3080,10 @@ public String clientSourceAddress() {
}

private void closeWithAuthException(String operation, Throwable e) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, operation, getPrincipal(), Optional.empty(), e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage()));
logAuthException(remoteAddress, operation, getPrincipal(), Optional.empty(), unwrapEx);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, unwrapEx.getMessage()));
close();
}