Skip to content

Commit e8695bf

Browse files
[feat][broker] OneStageAuth State: move authn out of constructor (apache#19295)
1 parent 8d81392 commit e8695bf

File tree

3 files changed

+210
-18
lines changed

3 files changed

+210
-18
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/OneStageAuthenticationState.java

+72-17
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,60 @@
2020

2121
import static java.nio.charset.StandardCharsets.UTF_8;
2222
import java.net.SocketAddress;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ExecutionException;
2325
import javax.naming.AuthenticationException;
2426
import javax.net.ssl.SSLSession;
2527
import javax.servlet.http.HttpServletRequest;
2628
import org.apache.pulsar.common.api.AuthData;
2729

2830
/**
29-
* Interface for authentication state.
30-
*
31-
* It tell broker whether the authentication is completed or not,
32-
* if completed, what is the AuthRole is.
31+
* A class to track single stage authentication. This class assumes that:
32+
* 1. {@link #authenticateAsync(AuthData)} is called once and when the {@link CompletableFuture} completes,
33+
* authentication is complete.
34+
* 2. Authentication does not expire, so {@link #isExpired()} always returns false.
35+
* <p>
36+
* See {@link AuthenticationState} for Pulsar's contract on how this interface is used by Pulsar.
3337
*/
3438
public class OneStageAuthenticationState implements AuthenticationState {
3539

36-
private final AuthenticationDataSource authenticationDataSource;
37-
private final String authRole;
40+
private AuthenticationDataSource authenticationDataSource;
41+
private final SocketAddress remoteAddress;
42+
private final SSLSession sslSession;
43+
private final AuthenticationProvider provider;
44+
private volatile String authRole;
45+
3846

47+
/**
48+
* Constructor for a {@link OneStageAuthenticationState} where there is no authentication performed during
49+
* initialization.
50+
* @param remoteAddress - remoteAddress associated with the {@link AuthenticationState}
51+
* @param sslSession - sslSession associated with the {@link AuthenticationState}
52+
* @param provider - {@link AuthenticationProvider} to use to verify {@link AuthData}
53+
*/
3954
public OneStageAuthenticationState(AuthData authData,
4055
SocketAddress remoteAddress,
4156
SSLSession sslSession,
42-
AuthenticationProvider provider) throws AuthenticationException {
43-
this.authenticationDataSource = new AuthenticationDataCommand(
44-
new String(authData.getBytes(), UTF_8), remoteAddress, sslSession);
45-
this.authRole = provider.authenticate(authenticationDataSource);
57+
AuthenticationProvider provider) {
58+
this.provider = provider;
59+
this.remoteAddress = remoteAddress;
60+
this.sslSession = sslSession;
4661
}
4762

48-
public OneStageAuthenticationState(HttpServletRequest request, AuthenticationProvider provider)
49-
throws AuthenticationException {
63+
public OneStageAuthenticationState(HttpServletRequest request, AuthenticationProvider provider) {
64+
// Must initialize this here for backwards compatibility with http authentication
5065
this.authenticationDataSource = new AuthenticationDataHttps(request);
51-
this.authRole = provider.authenticate(authenticationDataSource);
66+
this.provider = provider;
67+
// These are not used when invoking this constructor.
68+
this.remoteAddress = null;
69+
this.sslSession = null;
5270
}
5371

5472
@Override
55-
public String getAuthRole() {
73+
public String getAuthRole() throws AuthenticationException {
74+
if (authRole == null) {
75+
throw new AuthenticationException("Must authenticate before calling getAuthRole");
76+
}
5677
return authRole;
5778
}
5879

@@ -61,13 +82,47 @@ public AuthenticationDataSource getAuthDataSource() {
6182
return authenticationDataSource;
6283
}
6384

85+
/**
86+
* Warning: this method is not intended to be called concurrently.
87+
*/
88+
@Override
89+
public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
90+
if (authRole != null) {
91+
// Authentication is already completed
92+
return CompletableFuture.completedFuture(null);
93+
}
94+
this.authenticationDataSource = new AuthenticationDataCommand(
95+
new String(authData.getBytes(), UTF_8), remoteAddress, sslSession);
96+
97+
return provider
98+
.authenticateAsync(authenticationDataSource)
99+
.thenApply(role -> {
100+
this.authRole = role;
101+
// Single stage authentication always returns null
102+
return null;
103+
});
104+
}
105+
106+
/**
107+
* @deprecated use {@link #authenticateAsync(AuthData)}
108+
*/
109+
@Deprecated(since = "2.12.0")
64110
@Override
65-
public AuthData authenticate(AuthData authData) {
66-
return null;
111+
public AuthData authenticate(AuthData authData) throws AuthenticationException {
112+
try {
113+
return authenticateAsync(authData).get();
114+
} catch (InterruptedException | ExecutionException e) {
115+
throw new RuntimeException(e);
116+
}
67117
}
68118

119+
/**
120+
* @deprecated rely on result from {@link #authenticateAsync(AuthData)}. For more information, see the Javadoc
121+
* for {@link AuthenticationState#isComplete()}.
122+
*/
123+
@Deprecated(since = "2.12.0")
69124
@Override
70125
public boolean isComplete() {
71-
return true;
126+
return authRole != null;
72127
}
73128
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.pulsar.broker.authentication;
21+
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
import static org.testng.Assert.assertEquals;
25+
import static org.testng.Assert.assertFalse;
26+
import static org.testng.Assert.assertNotNull;
27+
import static org.testng.Assert.assertNull;
28+
import static org.testng.Assert.assertSame;
29+
import static org.testng.Assert.assertThrows;
30+
import static org.testng.Assert.assertTrue;
31+
import org.apache.pulsar.broker.ServiceConfiguration;
32+
import org.apache.pulsar.common.api.AuthData;
33+
import org.testng.annotations.Test;
34+
import javax.naming.AuthenticationException;
35+
import javax.servlet.http.HttpServletRequest;
36+
import java.io.IOException;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.atomic.LongAdder;
39+
40+
public class OneStageAuthenticationStateTest {
41+
42+
public static class CountingAuthenticationProvider implements AuthenticationProvider {
43+
public LongAdder authCallCount = new LongAdder();
44+
45+
@Override
46+
public void initialize(ServiceConfiguration config) throws IOException {
47+
}
48+
49+
@Override
50+
public String getAuthMethodName() {
51+
return null;
52+
}
53+
54+
@Override
55+
public void close() throws IOException {
56+
}
57+
58+
@Override
59+
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
60+
authCallCount.increment();
61+
return CompletableFuture.completedFuture(authData.getCommandData());
62+
}
63+
64+
public int getAuthCallCount() {
65+
return authCallCount.intValue();
66+
}
67+
}
68+
69+
@Test
70+
public void verifyAuthenticateAsyncIsCalledExactlyOnceAndSetsRole() throws Exception {
71+
CountingAuthenticationProvider provider = new CountingAuthenticationProvider();
72+
AuthData authData = AuthData.of("role".getBytes());
73+
OneStageAuthenticationState authState = new OneStageAuthenticationState(authData, null, null, provider);
74+
assertEquals(provider.getAuthCallCount(), 0, "Auth count should not increase yet");
75+
AuthData challenge = authState.authenticateAsync(authData).get();
76+
assertNull(challenge);
77+
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once");
78+
assertEquals(authState.getAuthRole(), "role");
79+
AuthenticationDataSource firstAuthenticationDataSource = authState.getAuthDataSource();
80+
assertTrue(firstAuthenticationDataSource instanceof AuthenticationDataCommand);
81+
82+
// Verify subsequent call to authenticate does not change data
83+
AuthData secondChallenge = authState.authenticateAsync(AuthData.of("admin".getBytes())).get();
84+
assertNull(secondChallenge);
85+
assertEquals(authState.getAuthRole(), "role");
86+
AuthenticationDataSource secondAuthenticationDataSource = authState.getAuthDataSource();
87+
assertSame(secondAuthenticationDataSource, firstAuthenticationDataSource);
88+
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once, even later.");
89+
}
90+
91+
@SuppressWarnings("deprecation")
92+
@Test
93+
public void verifyAuthenticateIsCalledExactlyOnceAndSetsRole() throws Exception {
94+
CountingAuthenticationProvider provider = new CountingAuthenticationProvider();
95+
AuthData authData = AuthData.of("role".getBytes());
96+
OneStageAuthenticationState authState = new OneStageAuthenticationState(authData, null, null, provider);
97+
assertEquals(provider.getAuthCallCount(), 0, "Auth count should not increase yet");
98+
assertFalse(authState.isComplete());
99+
AuthData challenge = authState.authenticate(authData);
100+
assertNull(challenge);
101+
assertTrue(authState.isComplete());
102+
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once");
103+
assertEquals(authState.getAuthRole(), "role");
104+
AuthenticationDataSource firstAuthenticationDataSource = authState.getAuthDataSource();
105+
assertTrue(firstAuthenticationDataSource instanceof AuthenticationDataCommand);
106+
107+
// Verify subsequent call to authenticate does not change data
108+
AuthData secondChallenge = authState.authenticate(AuthData.of("admin".getBytes()));
109+
assertNull(secondChallenge);
110+
assertEquals(authState.getAuthRole(), "role");
111+
AuthenticationDataSource secondAuthenticationDataSource = authState.getAuthDataSource();
112+
assertSame(secondAuthenticationDataSource, firstAuthenticationDataSource);
113+
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once, even later.");
114+
}
115+
116+
@Test
117+
public void verifyGetAuthRoleBeforeAuthenticateFails() {
118+
CountingAuthenticationProvider provider = new CountingAuthenticationProvider();
119+
AuthData authData = AuthData.of("role".getBytes());
120+
OneStageAuthenticationState authState = new OneStageAuthenticationState(authData, null, null, provider);
121+
assertThrows(AuthenticationException.class, authState::getAuthRole);
122+
assertNull(authState.getAuthDataSource());
123+
}
124+
125+
@Test
126+
public void verifyHttpAuthConstructorInitializesAuthDataSourceAndDoesNotAuthenticateData() {
127+
HttpServletRequest request = mock(HttpServletRequest.class);
128+
when(request.getRemoteAddr()).thenReturn("localhost");
129+
when(request.getRemotePort()).thenReturn(8080);
130+
CountingAuthenticationProvider provider = new CountingAuthenticationProvider();
131+
OneStageAuthenticationState authState = new OneStageAuthenticationState(request, provider);
132+
assertNotNull(authState.getAuthDataSource());
133+
assertEquals(provider.getAuthCallCount(), 0);
134+
}
135+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -947,10 +947,12 @@ protected void handleConnect(CommandConnect connect) {
947947
+ " using auth method [%s] is not available", originalAuthMethod));
948948
}
949949

950+
AuthData originalAuthDataCopy = AuthData.of(connect.getOriginalAuthData().getBytes());
950951
originalAuthState = originalAuthenticationProvider.newAuthState(
951-
AuthData.of(connect.getOriginalAuthData().getBytes()),
952+
originalAuthDataCopy,
952953
remoteAddress,
953954
sslSession);
955+
originalAuthState.authenticate(originalAuthDataCopy);
954956
originalAuthData = originalAuthState.getAuthDataSource();
955957
originalPrincipal = originalAuthState.getAuthRole();
956958

0 commit comments

Comments
 (0)