-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat][broker] OneStageAuth State: move authn out of constructor #19295
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,39 +20,60 @@ | |
|
||
import static java.nio.charset.StandardCharsets.UTF_8; | ||
import java.net.SocketAddress; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import javax.naming.AuthenticationException; | ||
import javax.net.ssl.SSLSession; | ||
import javax.servlet.http.HttpServletRequest; | ||
import org.apache.pulsar.common.api.AuthData; | ||
|
||
/** | ||
* Interface for authentication state. | ||
* | ||
* It tell broker whether the authentication is completed or not, | ||
* if completed, what is the AuthRole is. | ||
* A class to track single stage authentication. This class assumes that: | ||
* 1. {@link #authenticateAsync(AuthData)} is called once and when the {@link CompletableFuture} completes, | ||
* authentication is complete. | ||
* 2. Authentication does not expire, so {@link #isExpired()} always returns false. | ||
* <p> | ||
* See {@link AuthenticationState} for Pulsar's contract on how this interface is used by Pulsar. | ||
*/ | ||
public class OneStageAuthenticationState implements AuthenticationState { | ||
|
||
private final AuthenticationDataSource authenticationDataSource; | ||
private final String authRole; | ||
private AuthenticationDataSource authenticationDataSource; | ||
private final SocketAddress remoteAddress; | ||
private final SSLSession sslSession; | ||
private final AuthenticationProvider provider; | ||
private volatile String authRole; | ||
|
||
|
||
/** | ||
* Constructor for a {@link OneStageAuthenticationState} where there is no authentication performed during | ||
* initialization. | ||
* @param remoteAddress - remoteAddress associated with the {@link AuthenticationState} | ||
* @param sslSession - sslSession associated with the {@link AuthenticationState} | ||
* @param provider - {@link AuthenticationProvider} to use to verify {@link AuthData} | ||
*/ | ||
public OneStageAuthenticationState(AuthData authData, | ||
SocketAddress remoteAddress, | ||
SSLSession sslSession, | ||
AuthenticationProvider provider) throws AuthenticationException { | ||
this.authenticationDataSource = new AuthenticationDataCommand( | ||
new String(authData.getBytes(), UTF_8), remoteAddress, sslSession); | ||
this.authRole = provider.authenticate(authenticationDataSource); | ||
AuthenticationProvider provider) { | ||
this.provider = provider; | ||
this.remoteAddress = remoteAddress; | ||
this.sslSession = sslSession; | ||
} | ||
|
||
public OneStageAuthenticationState(HttpServletRequest request, AuthenticationProvider provider) | ||
throws AuthenticationException { | ||
public OneStageAuthenticationState(HttpServletRequest request, AuthenticationProvider provider) { | ||
// Must initialize this here for backwards compatibility with http authentication | ||
this.authenticationDataSource = new AuthenticationDataHttps(request); | ||
this.authRole = provider.authenticate(authenticationDataSource); | ||
this.provider = provider; | ||
// These are not used when invoking this constructor. | ||
this.remoteAddress = null; | ||
this.sslSession = null; | ||
} | ||
|
||
@Override | ||
public String getAuthRole() { | ||
public String getAuthRole() throws AuthenticationException { | ||
if (authRole == null) { | ||
throw new AuthenticationException("Must authenticate before calling getAuthRole"); | ||
} | ||
return authRole; | ||
} | ||
|
||
|
@@ -61,13 +82,47 @@ public AuthenticationDataSource getAuthDataSource() { | |
return authenticationDataSource; | ||
} | ||
|
||
/** | ||
* Warning: this method is not intended to be called concurrently. | ||
*/ | ||
@Override | ||
public CompletableFuture<AuthData> authenticateAsync(AuthData authData) { | ||
if (authRole != null) { | ||
// Authentication is already completed | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
this.authenticationDataSource = new AuthenticationDataCommand( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have one question: When having It looks like so. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is a class for single stage authentication, there are no If this were a multi-stage auth, this solution would not work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your means that a single-stage authentication doesn't refresh authentication data, if so, when having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, that is what I mean. However, I don't agree that it means we need to create a new My understanding of the the protocol is that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, I see. |
||
new String(authData.getBytes(), UTF_8), remoteAddress, sslSession); | ||
|
||
return provider | ||
.authenticateAsync(authenticationDataSource) | ||
.thenApply(role -> { | ||
this.authRole = role; | ||
// Single stage authentication always returns null | ||
return null; | ||
}); | ||
} | ||
|
||
/** | ||
* @deprecated use {@link #authenticateAsync(AuthData)} | ||
*/ | ||
@Deprecated(since = "2.12.0") | ||
@Override | ||
public AuthData authenticate(AuthData authData) { | ||
return null; | ||
public AuthData authenticate(AuthData authData) throws AuthenticationException { | ||
try { | ||
return authenticateAsync(authData).get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* @deprecated rely on result from {@link #authenticateAsync(AuthData)}. For more information, see the Javadoc | ||
* for {@link AuthenticationState#isComplete()}. | ||
*/ | ||
@Deprecated(since = "2.12.0") | ||
@Override | ||
public boolean isComplete() { | ||
return true; | ||
return authRole != null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.pulsar.broker.authentication; | ||
|
||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
import static org.testng.Assert.assertEquals; | ||
import static org.testng.Assert.assertFalse; | ||
import static org.testng.Assert.assertNotNull; | ||
import static org.testng.Assert.assertNull; | ||
import static org.testng.Assert.assertSame; | ||
import static org.testng.Assert.assertThrows; | ||
import static org.testng.Assert.assertTrue; | ||
import org.apache.pulsar.broker.ServiceConfiguration; | ||
import org.apache.pulsar.common.api.AuthData; | ||
import org.testng.annotations.Test; | ||
import javax.naming.AuthenticationException; | ||
import javax.servlet.http.HttpServletRequest; | ||
import java.io.IOException; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.atomic.LongAdder; | ||
|
||
public class OneStageAuthenticationStateTest { | ||
|
||
public static class CountingAuthenticationProvider implements AuthenticationProvider { | ||
public LongAdder authCallCount = new LongAdder(); | ||
|
||
@Override | ||
public void initialize(ServiceConfiguration config) throws IOException { | ||
} | ||
|
||
@Override | ||
public String getAuthMethodName() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
} | ||
|
||
@Override | ||
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) { | ||
authCallCount.increment(); | ||
return CompletableFuture.completedFuture(authData.getCommandData()); | ||
} | ||
|
||
public int getAuthCallCount() { | ||
return authCallCount.intValue(); | ||
} | ||
} | ||
|
||
@Test | ||
public void verifyAuthenticateAsyncIsCalledExactlyOnceAndSetsRole() throws Exception { | ||
CountingAuthenticationProvider provider = new CountingAuthenticationProvider(); | ||
AuthData authData = AuthData.of("role".getBytes()); | ||
OneStageAuthenticationState authState = new OneStageAuthenticationState(authData, null, null, provider); | ||
assertEquals(provider.getAuthCallCount(), 0, "Auth count should not increase yet"); | ||
AuthData challenge = authState.authenticateAsync(authData).get(); | ||
assertNull(challenge); | ||
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once"); | ||
assertEquals(authState.getAuthRole(), "role"); | ||
AuthenticationDataSource firstAuthenticationDataSource = authState.getAuthDataSource(); | ||
assertTrue(firstAuthenticationDataSource instanceof AuthenticationDataCommand); | ||
|
||
// Verify subsequent call to authenticate does not change data | ||
AuthData secondChallenge = authState.authenticateAsync(AuthData.of("admin".getBytes())).get(); | ||
assertNull(secondChallenge); | ||
assertEquals(authState.getAuthRole(), "role"); | ||
AuthenticationDataSource secondAuthenticationDataSource = authState.getAuthDataSource(); | ||
assertSame(secondAuthenticationDataSource, firstAuthenticationDataSource); | ||
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once, even later."); | ||
} | ||
|
||
@SuppressWarnings("deprecation") | ||
@Test | ||
public void verifyAuthenticateIsCalledExactlyOnceAndSetsRole() throws Exception { | ||
CountingAuthenticationProvider provider = new CountingAuthenticationProvider(); | ||
AuthData authData = AuthData.of("role".getBytes()); | ||
OneStageAuthenticationState authState = new OneStageAuthenticationState(authData, null, null, provider); | ||
assertEquals(provider.getAuthCallCount(), 0, "Auth count should not increase yet"); | ||
assertFalse(authState.isComplete()); | ||
AuthData challenge = authState.authenticate(authData); | ||
assertNull(challenge); | ||
assertTrue(authState.isComplete()); | ||
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once"); | ||
assertEquals(authState.getAuthRole(), "role"); | ||
AuthenticationDataSource firstAuthenticationDataSource = authState.getAuthDataSource(); | ||
assertTrue(firstAuthenticationDataSource instanceof AuthenticationDataCommand); | ||
|
||
// Verify subsequent call to authenticate does not change data | ||
AuthData secondChallenge = authState.authenticate(AuthData.of("admin".getBytes())); | ||
assertNull(secondChallenge); | ||
assertEquals(authState.getAuthRole(), "role"); | ||
AuthenticationDataSource secondAuthenticationDataSource = authState.getAuthDataSource(); | ||
assertSame(secondAuthenticationDataSource, firstAuthenticationDataSource); | ||
assertEquals(provider.getAuthCallCount(), 1, "Call authenticate only once, even later."); | ||
} | ||
|
||
@Test | ||
public void verifyGetAuthRoleBeforeAuthenticateFails() { | ||
CountingAuthenticationProvider provider = new CountingAuthenticationProvider(); | ||
AuthData authData = AuthData.of("role".getBytes()); | ||
OneStageAuthenticationState authState = new OneStageAuthenticationState(authData, null, null, provider); | ||
assertThrows(AuthenticationException.class, authState::getAuthRole); | ||
assertNull(authState.getAuthDataSource()); | ||
} | ||
|
||
@Test | ||
public void verifyHttpAuthConstructorInitializesAuthDataSourceAndDoesNotAuthenticateData() { | ||
HttpServletRequest request = mock(HttpServletRequest.class); | ||
when(request.getRemoteAddr()).thenReturn("localhost"); | ||
when(request.getRemotePort()).thenReturn(8080); | ||
CountingAuthenticationProvider provider = new CountingAuthenticationProvider(); | ||
OneStageAuthenticationState authState = new OneStageAuthenticationState(request, provider); | ||
assertNotNull(authState.getAuthDataSource()); | ||
assertEquals(provider.getAuthCallCount(), 0); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR looks like a breaking change. In KoP, this method is used to get the role from the token. After upgrading the dependency, many authorization related tests failed. See https://github.com/streamnative/kop/actions/runs/4343910383/jobs/7586547352
Not sure if it should be reverted in master branch, but it should not be cherry-picked to release branches. @michaeljmarshall
/cc @Demogorgon314 @nodece @coderzc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BewareMyPower - I agree that we shouldn't cherry pick this line. I don't think I did though, taking a quick look at one of the linked commits. I primarily cherry picked the code that calls
authenticate
on the original auth data. Are you seeing this error in the kop logs?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand now. That KoP test references pulsar 3.0.0, so it must be from a recent build of master. First, I'll note that I didn't cherry pick the behavior change to old release branches. I cherry-picked the extra authentication call for original auth data, which was code introduced in this PR. Second, I think this change is necessary for enabling PIP 97. I first documented the contract here: #19283. The primary issue is that creating the
AuthState
should not trigger authenticating theAuthData
. KoP relied on this implementation detail, and that is why the tests failed. Given that the only impacted parties are plugin developers, I think a PIP, documented release notes, a major version bump, and a fail fast behavior (the object throws an exception), make this change acceptable for 3.0.0. Let me know what you think, thanks.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed the fact that you have removed these
cherry-picked
labels.Regarding this change, I think you should add another constructor to
OneStageAuthenticationState
. Currently, the first argumentauthData
is actually not used. Maybe you didn't remove it because it's public. (However, you removed the exception signature) But the semantic changed and it looked weird now.To keep the compatibility, it's better to mark the constructor as deprecated first and keep it the semantic as "authenticate synchronously". To use the asynchronous semantic, you should add another constructor to do that.
I'd rather use "semantics" instead of the "implementation detail". Take
Producer#send
as example:If you made a change that makes
send
asynchronous, it should be treated as a breaking change. But if you just modified the exception message, for example, adding a common prefix to the exception message, it would be okay.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BewareMyPower - thanks for the context and the suggestion.
Thanks for the suggestion, I agree this is the best way to move forward without breaking any other applications when 3.0.0 is released. I also agree that it makes the API clearer because the unused
AuthData
won't be passed in. I shoul be able to get a PR submitted within 24 hours or so.I didn't realize this would cause an issue. I'll add it back when I fix the constructor. Out of curiosity, what is the consequence of breaking source compatibility by removing the exception from the method signature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I just noticed this point when I reviewed again. After looking for some documents and testing the ABI compatibility locally, it seems that removing the exception signature does not affect the ABI compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BewareMyPower - I finally got a chance to come back to this, and now I remember that I did try to use a similar solution that you proposed while writing this PR, but I found it didn't actually work as you propose. There are a few details to show the issue:
First, the
OneStageAuthenticationState
is not used directly by the broker or by plugins. Instead, the broker and plugins useAuthenticationProvider#newAuthState
to create theAuthenticationState
object. Here is the code:pulsar/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
Lines 96 to 104 in fb7f14c
As such, changing the constructor in the way you propose is unlikely to help users that built their own extensions. For example, KoP does the following:
https://github.com/streamnative/kop/blob/c9f66e4d24608dea6c7f4badde1d288b7dc1f74d/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryManager.java#L105-L106
Users must rely on the
AuthenticationProvider#newAuthState
in order to make theAuthenticationProvider
pluggable.When I noticed this detail while writing this PR, I thought about adding a new method to the
AuthenticationProvider
that would not take theAuthData
object, in the same way that you proposed for the newOneStageAuthenticationState
constructor. However, the main issue is how to default that method in the interface. We could use the following definition:This would be analogous to the current
newAuthState
method, but the issue is that customAuthenticationProvider
implementations that override the currentnewAuthState
method likely don't want to use theOneStageAuthenticationState
, and that would lead to unexpected behavior.Another option could be:
However, that would require breaking the current implementation in the way you're concerned about. (For what it's worth, this option might be the best long term solution for cleaning up the interface if we want to remove the
authData
that is passed. The main point is that it doesn't take care of your concern of not breaking user code.)Another option:
This unimplemented option has two problems. First, it will break user code if they do not implement the method. Since this whole exercise is an effort to try to prevent users from needing to change any code, that isn't a good option. Second, it removes the
OneStageAuthenticationState
default already present in the interface, which seems problematic.In my view, the primary challenge is that the
AuthenticationProvider
interface has a brittle default behavior.Given the above, do you have any ideas on how to make this better for users building their own broker extensions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you show an example of a possible unexpected behavior? IMO, the existing authentication provider only overrides the previous
newAuthState
method still has the previous behavior and is not affected by the newnewAuthState
method.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge comes from the fact that Apache Pulsar will transition to using the new method to build the state object and if that method is not overridden by the custom
AuthenticationProvider
implementation, theOneStageAuthenticationState
will be used. That will lead to unexpected behavior.A good example is the
AuthenticationProviderToken
class. That one would "work" with theOneStageAuthenticationState
with the key difference being theisExpired
value fromOneStageAuthenticationState
always returnsfalse
while theTokenAuthenticationState
returnstrue
when the token'sexp
claim indicates the token has expired.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it. The key point is that Pulsar will prefer the new overload but the existing implementations might still implement the old overload. It seems that there is no better way to solve it perfectly.