Skip to content

Commit 370f6d7

Browse files
michaeljmarshallRobertIndie
authored andcommitted
[fix][broker] Implement authenticateAsync for AuthenticationProviderList (#20132)
PIP: #12105 and #19771 ### Motivation With the implementation of asynchronous authentication in PIP 97, I missed a case in the `AuthenticationProviderList` where we need to implement the `authenticateAsync` methods. This PR is necessary for making the `AuthenticationProviderToken` and the `AuthenticationProviderOpenID` work together, which is necessary for anyone transitioning to `AuthenticationProviderOpenID`. ### Modifications * Implement `AuthenticationListState#authenticateAsync` using a recursive algorithm that first attempts to authenticate the client using the current `authState` and then tries the remaining options. * Implement `AuthenticationProviderList#authenticateAsync` using a recursive algorithm that attempts each provider sequentially. * Add test to `AuthenticationProviderListTest` that exercises this method. It didn't technically fail previously, but it's worth adding. * Add test to `AuthenticationProviderOpenIDIntegrationTest` to cover the exact failures that were causing problems. (cherry picked from commit 58ccf02)
1 parent 13eb2a7 commit 370f6d7

File tree

3 files changed

+175
-1
lines changed

3 files changed

+175
-1
lines changed

pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java

+57
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.github.tomakehurst.wiremock.WireMockServer;
3232
import io.jsonwebtoken.SignatureAlgorithm;
3333
import io.jsonwebtoken.impl.DefaultJwtBuilder;
34+
import io.jsonwebtoken.io.Decoders;
3435
import io.jsonwebtoken.security.Keys;
3536
import java.io.IOException;
3637
import java.nio.file.Files;
@@ -41,13 +42,19 @@
4142
import java.util.Base64;
4243
import java.util.Date;
4344
import java.util.HashMap;
45+
import java.util.Optional;
4446
import java.util.Properties;
4547
import java.util.Set;
4648
import java.util.concurrent.ExecutionException;
4749
import javax.naming.AuthenticationException;
50+
import lombok.Cleanup;
4851
import org.apache.pulsar.broker.ServiceConfiguration;
4952
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
53+
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
54+
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
55+
import org.apache.pulsar.broker.authentication.AuthenticationService;
5056
import org.apache.pulsar.broker.authentication.AuthenticationState;
57+
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
5158
import org.apache.pulsar.common.api.AuthData;
5259
import org.testng.annotations.AfterClass;
5360
import org.testng.annotations.BeforeClass;
@@ -438,6 +445,56 @@ public void testAuthenticationStateOpenIDForTokenExpiration() throws Exception {
438445
assertTrue(state.isExpired());
439446
}
440447

448+
/**
449+
* This test covers the migration scenario where you have both the Token and OpenID providers. It ensures
450+
* both kinds of authentication work.
451+
* @throws Exception
452+
*/
453+
@Test
454+
public void testAuthenticationProviderListStateSuccess() throws Exception {
455+
ServiceConfiguration conf = new ServiceConfiguration();
456+
conf.setAuthenticationEnabled(true);
457+
conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName(),
458+
AuthenticationProviderToken.class.getName()));
459+
Properties props = conf.getProperties();
460+
props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
461+
props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
462+
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
463+
464+
// Set up static token
465+
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
466+
// Use public key for validation
467+
String publicKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
468+
props.setProperty("tokenPublicKey", publicKeyStr);
469+
// Use private key to generate token
470+
String privateKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
471+
PrivateKey privateKey = AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
472+
SignatureAlgorithm.RS256);
473+
String staticToken = AuthTokenUtils.createToken(privateKey, "superuser", Optional.empty());
474+
475+
@Cleanup
476+
AuthenticationService service = new AuthenticationService(conf);
477+
AuthenticationProvider provider = service.getAuthenticationProvider("token");
478+
479+
// First, authenticate using OIDC
480+
String role = "superuser";
481+
String oidcToken = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
482+
assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(oidcToken)).get());
483+
484+
// Authenticate using the static token
485+
assertEquals("superuser", provider.authenticateAsync(new AuthenticationDataCommand(staticToken)).get());
486+
487+
// Use authenticationState to authentication using OIDC
488+
AuthenticationState state1 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
489+
assertNull(state1.authenticateAsync(AuthData.of(oidcToken.getBytes())).get());
490+
assertEquals(state1.getAuthRole(), role);
491+
492+
// Use authenticationState to authentication using static token
493+
AuthenticationState state2 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
494+
assertNull(state2.authenticateAsync(AuthData.of(staticToken.getBytes())).get());
495+
assertEquals(state1.getAuthRole(), role);
496+
}
497+
441498
@Test
442499
void ensureRoleClaimForNonSubClaimReturnsRole() throws Exception {
443500
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java

+94-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.SocketAddress;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
2526
import javax.naming.AuthenticationException;
2627
import javax.net.ssl.SSLSession;
2728
import javax.servlet.http.HttpServletRequest;
@@ -76,9 +77,12 @@ static <T, W> T applyAuthProcessor(List<W> processors, AuthProcessor<T, W> authF
7677
private static class AuthenticationListState implements AuthenticationState {
7778

7879
private final List<AuthenticationState> states;
79-
private AuthenticationState authState;
80+
private volatile AuthenticationState authState;
8081

8182
AuthenticationListState(List<AuthenticationState> states) {
83+
if (states == null || states.isEmpty()) {
84+
throw new IllegalArgumentException("Authentication state requires at least one state");
85+
}
8286
this.states = states;
8387
this.authState = states.get(0);
8488
}
@@ -96,6 +100,61 @@ public String getAuthRole() throws AuthenticationException {
96100
return getAuthState().getAuthRole();
97101
}
98102

103+
@Override
104+
public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
105+
// First, attempt to authenticate with the current auth state
106+
CompletableFuture<AuthData> authChallengeFuture = new CompletableFuture<>();
107+
authState
108+
.authenticateAsync(authData)
109+
.whenComplete((authChallenge, ex) -> {
110+
if (ex == null) {
111+
// Current authState is still correct. Just need to return the authChallenge.
112+
authChallengeFuture.complete(authChallenge);
113+
} else {
114+
if (log.isDebugEnabled()) {
115+
log.debug("Authentication failed for auth provider " + authState.getClass() + ": ", ex);
116+
}
117+
authenticateRemainingAuthStates(authChallengeFuture, authData, ex, states.size() - 1);
118+
}
119+
});
120+
return authChallengeFuture;
121+
}
122+
123+
private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authChallengeFuture,
124+
AuthData clientAuthData,
125+
Throwable previousException,
126+
int index) {
127+
if (index < 0) {
128+
if (previousException == null) {
129+
previousException = new AuthenticationException("Authentication required");
130+
}
131+
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
132+
"authentication-provider-list", "Authentication required");
133+
authChallengeFuture.completeExceptionally(previousException);
134+
return;
135+
}
136+
AuthenticationState state = states.get(index);
137+
if (state == authState) {
138+
// Skip the current auth state
139+
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, null, index - 1);
140+
} else {
141+
state.authenticateAsync(clientAuthData)
142+
.whenComplete((authChallenge, ex) -> {
143+
if (ex == null) {
144+
// Found the correct auth state
145+
authState = state;
146+
authChallengeFuture.complete(authChallenge);
147+
} else {
148+
if (log.isDebugEnabled()) {
149+
log.debug("Authentication failed for auth provider "
150+
+ authState.getClass() + ": ", ex);
151+
}
152+
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, ex, index - 1);
153+
}
154+
});
155+
}
156+
}
157+
99158
@Override
100159
public AuthData authenticate(AuthData authData) throws AuthenticationException {
101160
return applyAuthProcessor(
@@ -160,6 +219,40 @@ public String getAuthMethodName() {
160219
return providers.get(0).getAuthMethodName();
161220
}
162221

222+
@Override
223+
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
224+
CompletableFuture<String> roleFuture = new CompletableFuture<>();
225+
authenticateRemainingAuthProviders(roleFuture, authData, null, providers.size() - 1);
226+
return roleFuture;
227+
}
228+
229+
private void authenticateRemainingAuthProviders(CompletableFuture<String> roleFuture,
230+
AuthenticationDataSource authData,
231+
Throwable previousException,
232+
int index) {
233+
if (index < 0) {
234+
if (previousException == null) {
235+
previousException = new AuthenticationException("Authentication required");
236+
}
237+
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
238+
"authentication-provider-list", "Authentication required");
239+
roleFuture.completeExceptionally(previousException);
240+
return;
241+
}
242+
AuthenticationProvider provider = providers.get(index);
243+
provider.authenticateAsync(authData)
244+
.whenComplete((role, ex) -> {
245+
if (ex == null) {
246+
roleFuture.complete(role);
247+
} else {
248+
if (log.isDebugEnabled()) {
249+
log.debug("Authentication failed for auth provider " + provider.getClass() + ": ", ex);
250+
}
251+
authenticateRemainingAuthProviders(roleFuture, authData, ex, index - 1);
252+
}
253+
});
254+
}
255+
163256
@Override
164257
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
165258
return applyAuthProcessor(

pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,30 @@ public void testAuthenticate() throws Exception {
161161
testAuthenticate(tokenBB, SUBJECT_B);
162162
}
163163

164+
private void testAuthenticateAsync(String token, String expectedSubject) throws Exception {
165+
String actualSubject = authProvider.authenticateAsync(new AuthenticationDataSource() {
166+
@Override
167+
public boolean hasDataFromCommand() {
168+
return true;
169+
}
170+
171+
@Override
172+
public String getCommandData() {
173+
return token;
174+
}
175+
}).get();
176+
assertEquals(actualSubject, expectedSubject);
177+
}
178+
179+
@Test
180+
public void testAuthenticateAsync() throws Exception {
181+
testAuthenticateAsync(tokenAA, SUBJECT_A);
182+
testAuthenticateAsync(tokenAB, SUBJECT_B);
183+
testAuthenticateAsync(tokenBA, SUBJECT_A);
184+
testAuthenticateAsync(tokenBB, SUBJECT_B);
185+
}
186+
187+
164188
private AuthenticationState newAuthState(String token, String expectedSubject) throws Exception {
165189
// Must pass the token to the newAuthState for legacy reasons.
166190
AuthenticationState authState = authProvider.newAuthState(

0 commit comments

Comments
 (0)