Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Update AuthenticationProvider to simplify HTTP Authn #12

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.authentication;

import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
Expand All @@ -28,7 +30,6 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -103,7 +104,16 @@ default AuthenticationState newAuthState(AuthData authData,

/**
* Create an http authentication data State use passed in AuthenticationDataSource.
* @deprecated implementations that previously relied on this should update their implementation of
* {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)} or of
* {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)} so that the desired attributes
* are added in those methods.
*
* <p>Note: this method was only ever used to generate an {@link AuthenticationState} object in order to generate
* an {@link AuthenticationDataSource} that was added as the {@link AuthenticatedDataAttributeName} attribute to
* the http request. Removing this method removes an unnecessary step in the authentication flow.</p>
*/
@Deprecated(since = "2.12.0")
default AuthenticationState newHttpAuthState(HttpServletRequest request)
throws AuthenticationException {
return new OneStageAuthenticationState(request, this);
Expand All @@ -112,20 +122,17 @@ default AuthenticationState newHttpAuthState(HttpServletRequest request)
/**
* Validate the authentication for the given credentials with the specified authentication data.
*
* <p>Implementations of this method MUST modify the request by adding the {@link AuthenticatedRoleAttributeName}
* and the {@link AuthenticatedDataAttributeName} attributes.</p>
*
* <p>Warning: the calling thread is an IO thread. Any implementations that rely on blocking behavior
* must ensure that the execution is completed on using a separate thread pool to ensure IO threads
* are never blocked.</p>
*
* <p>Note: this method is marked as unstable because the Pulsar code base only calls it for the
* Pulsar Broker Auth SASL plugin. All non SASL HTTP requests are authenticated using the
* {@link AuthenticationProvider#authenticateAsync(AuthenticationDataSource)} method. As such,
* this method might be removed in favor of the SASL provider implementing the
* {@link AuthenticationProvider#authenticateAsync(AuthenticationDataSource)} method.</p>
*
* @return Set response, according to passed in request.
* @return Set response, according to passed in request, and return whether we should do following chain.doFilter.
* @throws Exception when authentication failed
* and return whether we should do following chain.doFilter or not.
*/
@InterfaceStability.Unstable
default CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletRequest request,
HttpServletResponse response) {
try {
Expand All @@ -138,10 +145,20 @@ default CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletReque
/**
* Set response, according to passed in request.
* and return whether we should do following chain.doFilter or not.
*
* <p>Implementations of this method MUST modify the request by adding the {@link AuthenticatedRoleAttributeName}
* and the {@link AuthenticatedDataAttributeName} attributes.</p>
*
* @return Set response, according to passed in request, and return whether we should do following chain.doFilter.
* @throws Exception when authentication failed
* @deprecated use and implement {@link AuthenticationProvider#authenticateHttpRequestAsync} instead.
*/
@Deprecated
default boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
throw new AuthenticationException("Not supported");
AuthenticationState authenticationState = newHttpAuthState(request);
String role = authenticate(authenticationState.getAuthDataSource());
request.setAttribute(AuthenticatedRoleAttributeName, role);
request.setAttribute(AuthenticatedDataAttributeName, authenticationState.getAuthDataSource());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.authentication;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import com.google.common.annotations.VisibleForTesting;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
Expand All @@ -39,6 +41,7 @@
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
Expand Down Expand Up @@ -160,6 +163,20 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
}
}

@Override
public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
HttpServletRequestWrapper wrappedRequest = new HttpServletRequestWrapper(request);
String httpHeaderValue = wrappedRequest.getHeader(HTTP_HEADER_NAME);
if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
throw new AuthenticationException("Invalid HTTP Authorization header");
}
AuthenticationDataSource authenticationDataSource = new AuthenticationDataHttps(wrappedRequest);
String role = authenticate(authenticationDataSource);
request.setAttribute(AuthenticatedRoleAttributeName, role);
request.setAttribute(AuthenticatedDataAttributeName, authenticationDataSource);
return true;
}

@Override
public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession)
throws AuthenticationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.authentication;

import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -28,10 +30,12 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,16 +89,77 @@ public AuthenticationService(ServiceConfiguration conf) throws PulsarServerExcep
}
}

private String getAuthMethodName(HttpServletRequest request) {
return request.getHeader(AuthenticationFilter.PULSAR_AUTH_METHOD_NAME);
}

private AuthenticationProvider getAuthProvider(String authMethodName) throws AuthenticationException {
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
throw new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName));
}
return providerToUse;
}

public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response)
throws Exception {
String authMethodName = getAuthMethodName(request);
if (authMethodName == null
&& SaslConstants.SASL_TYPE_VALUE.equalsIgnoreCase(request.getHeader(SaslConstants.SASL_HEADER_TYPE))) {
// This edge case must be handled because the Pulsar SASL implementation does not add the
// X-Pulsar-Auth-Method-Name header.
authMethodName = SaslConstants.AUTH_METHOD_NAME;
}
if (authMethodName != null) {
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
try {
return providerToUse.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + " : "
+ e.getMessage(), e);
}
throw e;
}
} else {
for (AuthenticationProvider provider : providers.values()) {
try {
return provider.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
+ e.getMessage(), e);
}
// Ignore the exception because we don't know which authentication method is expected here.
}
}
// No authentication provided
if (!providers.isEmpty()) {
if (StringUtils.isNotBlank(anonymousUserRole)) {
request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole);
request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request));
return true;
}
// If at least a provider was configured, then the authentication needs to be provider
throw new AuthenticationException("Authentication required");
} else {
// No authentication required
return true;
}
}
}

/**
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}
*/
@Deprecated(since = "2.12.0")
public String authenticateHttpRequest(HttpServletRequest request, AuthenticationDataSource authData)
throws AuthenticationException {
String authMethodName = request.getHeader(AuthenticationFilter.PULSAR_AUTH_METHOD_NAME);
String authMethodName = getAuthMethodName(request);

if (authMethodName != null) {
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
throw new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName));
}
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
try {
if (authData == null) {
AuthenticationState authenticationState = providerToUse.newHttpAuthState(request);
Expand Down Expand Up @@ -140,10 +205,11 @@ public String authenticateHttpRequest(HttpServletRequest request, Authentication
/**
* Mark this function as deprecated, it is recommended to use a method with the AuthenticationDataSource
* signature to implement it.
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}.
*/
@Deprecated
public String authenticateHttpRequest(HttpServletRequest request) throws AuthenticationException {
return authenticateHttpRequest(request, null);
return authenticateHttpRequest(request, (AuthenticationDataSource) null);
}

public AuthenticationProvider getAuthenticationProvider(String authMethodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,54 +49,12 @@ public AuthenticationFilter(AuthenticationService authenticationService) {
this.authenticationService = authenticationService;
}

private boolean isSaslRequest(HttpServletRequest request) {
if (request.getHeader(SaslConstants.SASL_HEADER_TYPE) == null
|| request.getHeader(SaslConstants.SASL_HEADER_TYPE).isEmpty()) {
return false;
}
if (request.getHeader(SaslConstants.SASL_HEADER_TYPE)
.equalsIgnoreCase(SaslConstants.SASL_TYPE_VALUE)) {
return true;
} else {
return false;
}
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
try {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;

if (!isSaslRequest(httpRequest)) {
// not sasl type, return role directly.
String authMethodName = httpRequest.getHeader(PULSAR_AUTH_METHOD_NAME);
String role;
if (authMethodName != null && authenticationService.getAuthenticationProvider(authMethodName) != null) {
AuthenticationState authenticationState = authenticationService
.getAuthenticationProvider(authMethodName).newHttpAuthState(httpRequest);
request.setAttribute(AuthenticatedDataAttributeName, authenticationState.getAuthDataSource());
role = authenticationService.authenticateHttpRequest(
(HttpServletRequest) request, authenticationState.getAuthDataSource());
} else {
request.setAttribute(AuthenticatedDataAttributeName,
new AuthenticationDataHttps((HttpServletRequest) request));
role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
}
request.setAttribute(AuthenticatedRoleAttributeName, role);

if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
}
chain.doFilter(request, response);
return;
}

boolean doFilter = authenticationService
.getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME)
.authenticateHttpRequest(httpRequest, httpResponse);

.authenticateHttpRequest((HttpServletRequest) request, (HttpServletResponse) response);
if (doFilter) {
chain.doFilter(request, response);
}
Expand All @@ -111,7 +66,6 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
} else {
LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e);
}
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,8 +885,8 @@ public void testTokenFromHttpParams() throws Exception {
doReturn("127.0.0.1").when(servletRequest).getRemoteAddr();
doReturn(0).when(servletRequest).getRemotePort();

AuthenticationState authState = provider.newHttpAuthState(servletRequest);
provider.authenticate(authState.getAuthDataSource());
boolean doFilter = provider.authenticateHttpRequest(servletRequest, null);
assertTrue(doFilter, "Authentication should have passed");
}

@Test
Expand All @@ -910,8 +910,8 @@ public void testTokenFromHttpHeaders() throws Exception {
doReturn("127.0.0.1").when(servletRequest).getRemoteAddr();
doReturn(0).when(servletRequest).getRemotePort();

AuthenticationState authState = provider.newHttpAuthState(servletRequest);
provider.authenticate(authState.getAuthDataSource());
boolean doFilter = provider.authenticateHttpRequest(servletRequest, null);
assertTrue(doFilter, "Authentication should have passed");
}

private static String createTokenWithAudience(Key signingKey, String audienceClaim, List<String> audience) {
Expand Down
Loading