Skip to content

Commit

Permalink
[broker][authentication]Support pass http auth status (#14044)
Browse files Browse the repository at this point in the history
Fixes #14404

*(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*

Master Issue: #<xyz>

### Motivation

Currently, pulsar auth is divided into two parts, one is the authn and authz of the pulsar protocol (e.g. produce and consume) and the other is the authn and authz of the HTTP protocol (e.g. management of pulsar clusters), auth is divided into two phases authn and authz, currently in the authn phase will return a string role, authz phase will check this role's permissions, The string role contains very little information and that blocks some work in the authz phase, so in pulsar, there is an interface `AuthenticationDataSource` which is used to pass more information from the authn to the authz phase

In auth, there are two classes `AuthenticationDataHttps` and `AuthenticationDataCommand` that implement this interface `AuthenticationDataSource`. AuthenticationDataCommand is used to pass the state information after the pulsar protocol authentication. `AuthenticationDataHttps` is used to pass the status information after the HTTP protocol authentication. `AuthenticationDataCommand` and `AuthenticationDataHttps` are both default implementations, but now for the pulsar protocol there is support for using user-defined implementations https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L817, that gives the user the ability to extend the auth state and pass more information, but for the HTTP protocol data does not yet support the use of user-defined data, this pr implementation it.

### Modifications

* Add a new interface `newHttpAuthState` for passing HTTP auth state
* Set auth method name for pulsar client
* Fixed wrong method signatures
  • Loading branch information
tuteng authored Feb 28, 2022
1 parent c35ac3f commit 330fcb9
Show file tree
Hide file tree
Showing 29 changed files with 563 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ default AuthenticationState newAuthState(AuthData authData,
return new OneStageAuthenticationState(authData, remoteAddress, sslSession, this);
}

/**
* Create an http authentication data State use passed in AuthenticationDataSource.
*/
default AuthenticationState newHttpAuthState(HttpServletRequest request)
throws AuthenticationException {
return new OneStageAuthenticationState(request, this);
}

/**
* Validate the authentication for the given credentials with the specified authentication data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,37 @@ public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteA
}
}

@Override
public AuthenticationState newHttpAuthState(HttpServletRequest request) throws AuthenticationException {
final List<AuthenticationState> states = new ArrayList<>(providers.size());

AuthenticationException authenticationException = null;
try {
applyAuthProcessor(
providers,
provider -> {
AuthenticationState state = provider.newHttpAuthState(request);
states.add(state);
return state;
}
);
} catch (AuthenticationException ae) {
authenticationException = ae;
}
if (states.isEmpty()) {
log.debug("Failed to initialize a new http auth state from {}",
request.getRemoteHost(), authenticationException);
if (authenticationException != null) {
throw authenticationException;
} else {
throw new AuthenticationException(
"Failed to initialize a new http auth state from " + request.getRemoteHost());
}
} else {
return new AuthenticationListState(states);
}
}

@Override
public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
Boolean authenticated = applyAuthProcessor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
Expand Down Expand Up @@ -165,6 +166,11 @@ public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteA
return new TokenAuthenticationState(this, authData, remoteAddress, sslSession);
}

@Override
public AuthenticationState newHttpAuthState(HttpServletRequest request) throws AuthenticationException {
return new TokenAuthenticationState(this, request);
}

public static String getToken(AuthenticationDataSource authData) throws AuthenticationException {
if (authData.hasDataFromCommand()) {
// Authenticate Pulsar binary connection
Expand Down Expand Up @@ -310,8 +316,6 @@ private static final class TokenAuthenticationState implements AuthenticationSta
private final AuthenticationProviderToken provider;
private AuthenticationDataSource authenticationDataSource;
private Jwt<?, Claims> jwt;
private final SocketAddress remoteAddress;
private final SSLSession sslSession;
private long expiration;

TokenAuthenticationState(
Expand All @@ -320,31 +324,50 @@ private static final class TokenAuthenticationState implements AuthenticationSta
SocketAddress remoteAddress,
SSLSession sslSession) throws AuthenticationException {
this.provider = provider;
this.remoteAddress = remoteAddress;
this.sslSession = sslSession;
this.authenticate(authData);
String token = new String(authData.getBytes(), UTF_8);
this.authenticationDataSource = new AuthenticationDataCommand(token, remoteAddress, sslSession);
this.checkExpiration(token);
}

TokenAuthenticationState(
AuthenticationProviderToken provider,
HttpServletRequest request) throws AuthenticationException {
this.provider = provider;
String httpHeaderValue = request.getHeader(HTTP_HEADER_NAME);
if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
throw new AuthenticationException("Invalid HTTP Authorization header");
}

// Remove prefix
String token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
this.authenticationDataSource = new AuthenticationDataHttps(request);
this.checkExpiration(token);
}

@Override
public String getAuthRole() throws AuthenticationException {
return provider.getPrincipal(jwt);
}

/**
* @param authData Authentication data.
* @return null. Explanation of returning null values, {@link AuthenticationState#authenticateAsync(AuthData)}
* @throws AuthenticationException
*/
@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
String token = new String(authData.getBytes(), UTF_8);
// There's no additional auth stage required
return null;
}

private void checkExpiration(String token) throws AuthenticationException {
this.jwt = provider.authenticateToken(token);
this.authenticationDataSource = new AuthenticationDataCommand(token, remoteAddress, sslSession);
if (jwt.getBody().getExpiration() != null) {
this.expiration = jwt.getBody().getExpiration().getTime();
} else {
// Disable expiration
this.expiration = Long.MAX_VALUE;
}

// There's no additional auth stage required
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,10 +85,10 @@ public AuthenticationService(ServiceConfiguration conf) throws PulsarServerExcep
}
}

public String authenticateHttpRequest(HttpServletRequest request) throws AuthenticationException {
public String authenticateHttpRequest(HttpServletRequest request, AuthenticationDataSource authData)
throws AuthenticationException {
AuthenticationException authenticationException = null;
AuthenticationDataSource authData = new AuthenticationDataHttps(request);
String authMethodName = request.getHeader("X-Pulsar-Auth-Method-Name");
String authMethodName = request.getHeader(AuthenticationFilter.PULSAR_AUTH_METHOD_NAME);

if (authMethodName != null) {
AuthenticationProvider providerToUse = providers.get(authMethodName);
Expand All @@ -96,6 +97,11 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent
String.format("Unsupported authentication method: [%s].", authMethodName));
}
try {
if (authData == null) {
AuthenticationState authenticationState = providerToUse.newHttpAuthState(request);
authData = authenticationState.getAuthDataSource();
}
// Backward compatible, the authData value was null in the previous implementation
return providerToUse.authenticate(authData);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
Expand All @@ -109,7 +115,8 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent
} else {
for (AuthenticationProvider provider : providers.values()) {
try {
return provider.authenticate(authData);
AuthenticationState authenticationState = provider.newHttpAuthState(request);
return provider.authenticate(authenticationState.getAuthDataSource());
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
Expand Down Expand Up @@ -137,6 +144,15 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent
}
}

/**
* Mark this function as deprecated, it is recommended to use a method with the AuthenticationDataSource
* signature to implement it.
*/
@Deprecated
public String authenticateHttpRequest(HttpServletRequest request) throws AuthenticationException {
return authenticateHttpRequest(request, null);
}

public AuthenticationProvider getAuthenticationProvider(String authMethodName) {
return providers.get(authMethodName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.SocketAddress;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.common.api.AuthData;

/**
Expand All @@ -45,6 +46,12 @@ public OneStageAuthenticationState(AuthData authData,
this.authRole = provider.authenticate(authenticationDataSource);
}

public OneStageAuthenticationState(HttpServletRequest request, AuthenticationProvider provider)
throws AuthenticationException {
this.authenticationDataSource = new AuthenticationDataHttps(request);
this.authRole = provider.authenticate(authenticationDataSource);
}

@Override
public String getAuthRole() {
return authRole;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,6 +45,8 @@ public class AuthenticationFilter implements Filter {

public static final String AuthenticatedRoleAttributeName = AuthenticationFilter.class.getName() + "-role";
public static final String AuthenticatedDataAttributeName = AuthenticationFilter.class.getName() + "-data";
public static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";


public AuthenticationFilter(AuthenticationService authenticationService) {
this.authenticationService = authenticationService;
Expand Down Expand Up @@ -71,10 +74,21 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha

if (!isSaslRequest(httpRequest)) {
// not sasl type, return role directly.
String role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
String authMethodName = httpRequest.getHeader(PULSAR_AUTH_METHOD_NAME);
String role;
if (authMethodName != null && authenticationService.getAuthenticationProvider(authMethodName) != null) {
AuthenticationState authenticationState = authenticationService
.getAuthenticationProvider(authMethodName).newHttpAuthState(httpRequest);
request.setAttribute(AuthenticatedDataAttributeName, authenticationState.getAuthDataSource());
role = authenticationService.authenticateHttpRequest(
(HttpServletRequest) request, authenticationState.getAuthDataSource());
} else {
request.setAttribute(AuthenticatedDataAttributeName,
new AuthenticationDataHttps((HttpServletRequest) request));
role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
}
request.setAttribute(AuthenticatedRoleAttributeName, role);
request.setAttribute(AuthenticatedDataAttributeName,
new AuthenticationDataHttps((HttpServletRequest) request));

if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pulsar.broker.authentication;

import static java.nio.charset.StandardCharsets.UTF_8;
import javax.servlet.http.HttpServletRequest;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -165,6 +168,14 @@ private AuthenticationState newAuthState(String token, String expectedSubject) t
return authState;
}

private AuthenticationState newHttpAuthState(HttpServletRequest request, String expectedSubject) throws Exception {
AuthenticationState authState = authProvider.newHttpAuthState(request);
assertEquals(authState.getAuthRole(), expectedSubject);
assertTrue(authState.isComplete());
assertFalse(authState.isExpired());
return authState;
}

private void verifyAuthStateExpired(AuthenticationState authState, String expectedSubject)
throws Exception {
assertEquals(authState.getAuthRole(), expectedSubject);
Expand All @@ -188,4 +199,38 @@ public void testNewAuthState() throws Exception {

}

@Test
public void testNewHttpAuthState() throws Exception {
HttpServletRequest requestAA = mock(HttpServletRequest.class);
when(requestAA.getRemoteAddr()).thenReturn("127.0.0.1");
when(requestAA.getRemotePort()).thenReturn(8080);
when(requestAA.getHeader("Authorization")).thenReturn("Bearer " + expiringTokenAA);
AuthenticationState authStateAA = newHttpAuthState(requestAA, SUBJECT_A);

HttpServletRequest requestAB = mock(HttpServletRequest.class);
when(requestAB.getRemoteAddr()).thenReturn("127.0.0.1");
when(requestAB.getRemotePort()).thenReturn(8080);
when(requestAB.getHeader("Authorization")).thenReturn("Bearer " + expiringTokenAB);
AuthenticationState authStateAB = newHttpAuthState(requestAB, SUBJECT_B);

HttpServletRequest requestBA = mock(HttpServletRequest.class);
when(requestBA.getRemoteAddr()).thenReturn("127.0.0.1");
when(requestBA.getRemotePort()).thenReturn(8080);
when(requestBA.getHeader("Authorization")).thenReturn("Bearer " + expiringTokenBA);
AuthenticationState authStateBA = newHttpAuthState(requestBA, SUBJECT_A);

HttpServletRequest requestBB = mock(HttpServletRequest.class);
when(requestBB.getRemoteAddr()).thenReturn("127.0.0.1");
when(requestBB.getRemotePort()).thenReturn(8080);
when(requestBB.getHeader("Authorization")).thenReturn("Bearer " + expiringTokenBB);
AuthenticationState authStateBB = newHttpAuthState(requestBB, SUBJECT_B);

Thread.sleep(TimeUnit.SECONDS.toMillis(6));

verifyAuthStateExpired(authStateAA, SUBJECT_A);
verifyAuthStateExpired(authStateAB, SUBJECT_B);
verifyAuthStateExpired(authStateBA, SUBJECT_A);
verifyAuthStateExpired(authStateBB, SUBJECT_B);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.LookupOptions;
Expand Down Expand Up @@ -137,8 +136,8 @@ public String originalPrincipal() {
return httpRequest.getHeader(ORIGINAL_PRINCIPAL_HEADER);
}

public AuthenticationDataHttps clientAuthData() {
return (AuthenticationDataHttps) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
public AuthenticationDataSource clientAuthData() {
return (AuthenticationDataSource) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
}

public boolean isRequestHttps() {
Expand Down Expand Up @@ -1175,7 +1174,8 @@ && pulsar().getBrokerService().isAuthorizationEnabled()) {
return FutureUtil.failedFuture(
new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request"));
}
AuthenticationDataHttps authData = clientAuthData();

AuthenticationDataSource authData = clientAuthData();
authData.setSubscription(subscription);
return pulsar().getBrokerService().getAuthorizationService()
.allowTopicOperationAsync(topicName, operation, originalPrincipal(), clientAppId(), authData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
public interface AuthenticationDataProvider extends Serializable {

String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
/*
* TLS
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.PulsarClientException;

public class AuthenticationBasic implements Authentication, EncodedAuthenticationParameterSupport {
static final String AUTH_METHOD_NAME = "basic";
private String userId;
private String password;

Expand All @@ -39,7 +40,7 @@ public void close() throws IOException {

@Override
public String getAuthMethodName() {
return "basic";
return AUTH_METHOD_NAME;
}

@Override
Expand Down
Loading

0 comments on commit 330fcb9

Please sign in to comment.