Skip to content

Commit fb2c96a

Browse files
[fix][broker] Implement authenticateAsync for AuthenticationProviderList (apache#20132)
PIP: apache#12105 and apache#19771 With the implementation of asynchronous authentication in PIP 97, I missed a case in the `AuthenticationProviderList` where we need to implement the `authenticateAsync` methods. This PR is necessary for making the `AuthenticationProviderToken` and the `AuthenticationProviderOpenID` work together, which is necessary for anyone transitioning to `AuthenticationProviderOpenID`. * Implement `AuthenticationListState#authenticateAsync` using a recursive algorithm that first attempts to authenticate the client using the current `authState` and then tries the remaining options. * Implement `AuthenticationProviderList#authenticateAsync` using a recursive algorithm that attempts each provider sequentially. * Add test to `AuthenticationProviderListTest` that exercises this method. It didn't technically fail previously, but it's worth adding. * Add test to `AuthenticationProviderOpenIDIntegrationTest` to cover the exact failures that were causing problems. (cherry picked from commit 58ccf02)
1 parent 139209a commit fb2c96a

File tree

3 files changed

+178
-1
lines changed

3 files changed

+178
-1
lines changed

pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java

+60
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.github.tomakehurst.wiremock.WireMockServer;
3232
import io.jsonwebtoken.SignatureAlgorithm;
3333
import io.jsonwebtoken.impl.DefaultJwtBuilder;
34+
import io.jsonwebtoken.io.Decoders;
3435
import io.jsonwebtoken.security.Keys;
3536
import java.io.IOException;
3637
import java.nio.file.FileSystems;
@@ -42,12 +43,19 @@
4243
import java.util.Collections;
4344
import java.util.Date;
4445
import java.util.HashMap;
46+
import java.util.HashSet;
47+
import java.util.Optional;
4548
import java.util.Properties;
4649
import java.util.concurrent.ExecutionException;
4750
import javax.naming.AuthenticationException;
51+
import lombok.Cleanup;
4852
import org.apache.pulsar.broker.ServiceConfiguration;
4953
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
54+
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
55+
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
56+
import org.apache.pulsar.broker.authentication.AuthenticationService;
5057
import org.apache.pulsar.broker.authentication.AuthenticationState;
58+
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
5159
import org.apache.pulsar.common.api.AuthData;
5260
import org.testng.annotations.AfterClass;
5361
import org.testng.annotations.BeforeClass;
@@ -406,6 +414,58 @@ public void testAuthenticationStateOpenIDForTokenExpiration() throws Exception {
406414
assertTrue(state.isExpired());
407415
}
408416

417+
/**
418+
* This test covers the migration scenario where you have both the Token and OpenID providers. It ensures
419+
* both kinds of authentication work.
420+
* @throws Exception
421+
*/
422+
@Test
423+
public void testAuthenticationProviderListStateSuccess() throws Exception {
424+
ServiceConfiguration conf = new ServiceConfiguration();
425+
conf.setAuthenticationEnabled(true);
426+
HashSet<String> providers = new HashSet<>();
427+
providers.add(AuthenticationProviderToken.class.getName());
428+
providers.add(AuthenticationProviderOpenID.class.getName());
429+
conf.setAuthenticationProviders(providers);
430+
Properties props = conf.getProperties();
431+
props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
432+
props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
433+
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
434+
435+
// Set up static token
436+
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
437+
// Use public key for validation
438+
String publicKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
439+
props.setProperty("tokenPublicKey", publicKeyStr);
440+
// Use private key to generate token
441+
String privateKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
442+
PrivateKey privateKey = AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
443+
SignatureAlgorithm.RS256);
444+
String staticToken = AuthTokenUtils.createToken(privateKey, "superuser", Optional.empty());
445+
446+
@Cleanup
447+
AuthenticationService service = new AuthenticationService(conf);
448+
AuthenticationProvider provider = service.getAuthenticationProvider("token");
449+
450+
// First, authenticate using OIDC
451+
String role = "superuser";
452+
String oidcToken = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
453+
assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(oidcToken)).get());
454+
455+
// Authenticate using the static token
456+
assertEquals("superuser", provider.authenticateAsync(new AuthenticationDataCommand(staticToken)).get());
457+
458+
// Use authenticationState to authentication using OIDC
459+
AuthenticationState state1 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
460+
assertNull(state1.authenticateAsync(AuthData.of(oidcToken.getBytes())).get());
461+
assertEquals(state1.getAuthRole(), role);
462+
463+
// Use authenticationState to authentication using static token
464+
AuthenticationState state2 = service.getAuthenticationProvider("token").newAuthState(null, null, null);
465+
assertNull(state2.authenticateAsync(AuthData.of(staticToken.getBytes())).get());
466+
assertEquals(state1.getAuthRole(), role);
467+
}
468+
409469
@Test
410470
void ensureRoleClaimForNonSubClaimReturnsRole() throws Exception {
411471
AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java

+94-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.SocketAddress;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
2526
import javax.naming.AuthenticationException;
2627
import javax.net.ssl.SSLSession;
2728
import javax.servlet.http.HttpServletRequest;
@@ -76,9 +77,12 @@ static <T, W> T applyAuthProcessor(List<W> processors, AuthProcessor<T, W> authF
7677
private static class AuthenticationListState implements AuthenticationState {
7778

7879
private final List<AuthenticationState> states;
79-
private AuthenticationState authState;
80+
private volatile AuthenticationState authState;
8081

8182
AuthenticationListState(List<AuthenticationState> states) {
83+
if (states == null || states.isEmpty()) {
84+
throw new IllegalArgumentException("Authentication state requires at least one state");
85+
}
8286
this.states = states;
8387
this.authState = states.get(0);
8488
}
@@ -96,6 +100,61 @@ public String getAuthRole() throws AuthenticationException {
96100
return getAuthState().getAuthRole();
97101
}
98102

103+
@Override
104+
public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
105+
// First, attempt to authenticate with the current auth state
106+
CompletableFuture<AuthData> authChallengeFuture = new CompletableFuture<>();
107+
authState
108+
.authenticateAsync(authData)
109+
.whenComplete((authChallenge, ex) -> {
110+
if (ex == null) {
111+
// Current authState is still correct. Just need to return the authChallenge.
112+
authChallengeFuture.complete(authChallenge);
113+
} else {
114+
if (log.isDebugEnabled()) {
115+
log.debug("Authentication failed for auth provider " + authState.getClass() + ": ", ex);
116+
}
117+
authenticateRemainingAuthStates(authChallengeFuture, authData, ex, states.size() - 1);
118+
}
119+
});
120+
return authChallengeFuture;
121+
}
122+
123+
private void authenticateRemainingAuthStates(CompletableFuture<AuthData> authChallengeFuture,
124+
AuthData clientAuthData,
125+
Throwable previousException,
126+
int index) {
127+
if (index < 0) {
128+
if (previousException == null) {
129+
previousException = new AuthenticationException("Authentication required");
130+
}
131+
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
132+
"authentication-provider-list", "Authentication required");
133+
authChallengeFuture.completeExceptionally(previousException);
134+
return;
135+
}
136+
AuthenticationState state = states.get(index);
137+
if (state == authState) {
138+
// Skip the current auth state
139+
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, null, index - 1);
140+
} else {
141+
state.authenticateAsync(clientAuthData)
142+
.whenComplete((authChallenge, ex) -> {
143+
if (ex == null) {
144+
// Found the correct auth state
145+
authState = state;
146+
authChallengeFuture.complete(authChallenge);
147+
} else {
148+
if (log.isDebugEnabled()) {
149+
log.debug("Authentication failed for auth provider "
150+
+ authState.getClass() + ": ", ex);
151+
}
152+
authenticateRemainingAuthStates(authChallengeFuture, clientAuthData, ex, index - 1);
153+
}
154+
});
155+
}
156+
}
157+
99158
@Override
100159
public AuthData authenticate(AuthData authData) throws AuthenticationException {
101160
return applyAuthProcessor(
@@ -160,6 +219,40 @@ public String getAuthMethodName() {
160219
return providers.get(0).getAuthMethodName();
161220
}
162221

222+
@Override
223+
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
224+
CompletableFuture<String> roleFuture = new CompletableFuture<>();
225+
authenticateRemainingAuthProviders(roleFuture, authData, null, providers.size() - 1);
226+
return roleFuture;
227+
}
228+
229+
private void authenticateRemainingAuthProviders(CompletableFuture<String> roleFuture,
230+
AuthenticationDataSource authData,
231+
Throwable previousException,
232+
int index) {
233+
if (index < 0) {
234+
if (previousException == null) {
235+
previousException = new AuthenticationException("Authentication required");
236+
}
237+
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
238+
"authentication-provider-list", "Authentication required");
239+
roleFuture.completeExceptionally(previousException);
240+
return;
241+
}
242+
AuthenticationProvider provider = providers.get(index);
243+
provider.authenticateAsync(authData)
244+
.whenComplete((role, ex) -> {
245+
if (ex == null) {
246+
roleFuture.complete(role);
247+
} else {
248+
if (log.isDebugEnabled()) {
249+
log.debug("Authentication failed for auth provider " + provider.getClass() + ": ", ex);
250+
}
251+
authenticateRemainingAuthProviders(roleFuture, authData, ex, index - 1);
252+
}
253+
});
254+
}
255+
163256
@Override
164257
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
165258
return applyAuthProcessor(

pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,30 @@ public void testAuthenticate() throws Exception {
161161
testAuthenticate(tokenBB, SUBJECT_B);
162162
}
163163

164+
private void testAuthenticateAsync(String token, String expectedSubject) throws Exception {
165+
String actualSubject = authProvider.authenticateAsync(new AuthenticationDataSource() {
166+
@Override
167+
public boolean hasDataFromCommand() {
168+
return true;
169+
}
170+
171+
@Override
172+
public String getCommandData() {
173+
return token;
174+
}
175+
}).get();
176+
assertEquals(actualSubject, expectedSubject);
177+
}
178+
179+
@Test
180+
public void testAuthenticateAsync() throws Exception {
181+
testAuthenticateAsync(tokenAA, SUBJECT_A);
182+
testAuthenticateAsync(tokenAB, SUBJECT_B);
183+
testAuthenticateAsync(tokenBA, SUBJECT_A);
184+
testAuthenticateAsync(tokenBB, SUBJECT_B);
185+
}
186+
187+
164188
private AuthenticationState newAuthState(String token, String expectedSubject) throws Exception {
165189
// Must pass the token to the newAuthState for legacy reasons.
166190
AuthenticationState authState = authProvider.newAuthState(

0 commit comments

Comments
 (0)