[KAFKA-13848] Clients remain connected after SASL re-authentication f…#12179
[KAFKA-13848] Clients remain connected after SASL re-authentication f…#12179showuon merged 2 commits intoapache:trunkfrom
Conversation
|
@rajinisivaram, @rondagostino please could you take a look? |
There was a problem hiding this comment.
Probably worth a comment explaining that sessionExpirationTimeNanos should always be set to a non-null value.
There was a problem hiding this comment.
After some investigation, I found the null value is indicating the re-auth is disabled:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java#L535-L543
So, maybe here, we should make sure we need re-auth (that is, we did have expiration time, if it's 0, which means it's expired, not re-auth disabled). ex:
if (retvalSessionLifetimeMs > 0L || credentialExpirationMs != null || connectionsMaxReauthMs != null)
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs;So, if the retvalSessionLifetimeMs==0, and we don't have credential expiration ms and connection max reauth ms, we should keep it as null
Does that make sense?
There was a problem hiding this comment.
Adding || credentialExpirationMs != null || connectionsMaxReauthMs != null to the if condition adds no value as we already know one of them has to be true to reach the if guard in general.
I think the correct fix is to delay the initialisation of retvalSessionLifetimeMs so that we can use a >= test in the expiry case and 0 in the non expiry cases while still allowing sessionExpirationTimeNanos being null to signal that re-auth is disabled.
|
@acsaki , we haven't get your response for some days, do you need help on it? We can co-author with you to address the comments and fix the tests. Please let me know. Thank you. |
Hi @showuon , Thank you! I wasn't working for a few days.. Thanks for all the suggestions (@SamBarker and @tombentley too), I'm writing some more tests to capture the intended behavior, which is as I understood:
SaslServerAuthenticatorTest does not lend itself easily to testing all these, oh well. :) |
SamBarker
left a comment
There was a problem hiding this comment.
I agree that testing is painful and you are doing a great job improving it.
There was a problem hiding this comment.
I'm wondering if its worth extracting a new test class SaslServerAuthenticatorSessionExpiryTest or similar and that would allow some of the common setup code to actually be extracted to a @BeforeEach and thus make it easier to reason about what's happening in the tests.
There was a problem hiding this comment.
I've cleaned up tests a bit to make them more readable but leave them as explicit as they can be. I usually don't mind a bit relaxed DRY if tests can be understood as is. Also, MockedStatics are supposed to be closed shorty after they did their thing so using them in a try-with-resources block is usually considered best. Any remaining redundancies in your opinion that should be dealt with?
There was a problem hiding this comment.
I agree I don't tend to be overly bothered by DRY in tests and its about read ability thus being able to easily see what the test is trying to prove.
I think the tests are fine as is, anything else is just polish and style changes.
There was a problem hiding this comment.
when reauth is disabled (when max reauth ms is NOT set), leave ReauthInfo#sessionExpirationTimeNanos as null but return millis until the token expires in SaslAuthenticateResponse's sessionLifetimeMs
I think I have a different understanding. My mental model was that if either of credentialExpirationMs != null || connectionsMaxReauthMs != null was true we have re-auth enabled. The re-auth disabled case I was thinking of was when both were false. Now that I think about that I'm not sure that is a valid scenario.
I think your original change to make setting sessionExpirationTimeNanos un-conditional was the right one, my >=0 test is effectively the same as making it un-conditional. Therefore my suggestion of delayed initialisation becomes moot as we don't need to worry about excluding the default value from the test.
I was wondering if the property change was what was actually causing the test failures that you found rather than making it un-conditional.
There was a problem hiding this comment.
@showuon what is your thinking on this?
There was a problem hiding this comment.
I was wondering if the property change was what was actually causing the test failures that you found rather than making it un-conditional.
@SamBarker ,I don't understand this. Which property are you referring to?
There was a problem hiding this comment.
OK, for @SamBarker , I think we can discuss the property change in a separate thread since it's related to tests.
For this change:
if (connectionsMaxReauthMs != null) {
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs;
}
I understand why you did this @acsaki . It's because you think:
when reauth is disabled (when max reauth ms is NOT set), leave ReauthInfo#sessionExpirationTimeNanos as null but return millis until the token expires in SaslAuthenticateResponse's sessionLifetimeMs
I think it's correct, IF the sasl client did close the connection after the sessionLifetimeMs. But I don't think we should have this optimistic assumption for this "potential" security issue. I agree with @SamBarker about your original version of "removing the if condition" is a good fix.:
...
else
retvalSessionLifetimeMs = zeroIfNegative(Math.min(credentialExpirationMs - authenticationEndMs,
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs;
That is, no matter the reauth is enabled or not, we set the sessionExpirationTimeNanos, to inform the server, too. So that we can make sure when the session expired, either server or client will kill this connection. WDYT?
There was a problem hiding this comment.
Also, the debug line in L698 confused me a little. When credentialExpirationMs != null && sessionExpirationTimeNanos == null, we'll log:
"Authentication complete; session max lifetime from broker config={} ms, credential expiration={} ({} ms); no session expiration, sending 0 ms to client"
The point I'm confused here is no session expiration. Why no session expiration? Since credentialExpirationMs != null, there must be some credential expiration time, so in this case, sending 0 to client doesn't make sense to me, right? I think the log should also be updated. WDYT?
There was a problem hiding this comment.
@rondagostino @rajinisivaram , do you have any comments? Thanks.
There was a problem hiding this comment.
@showuon this is what I've also found rather confusing. I agree that after the token expires the connection should be closed sooner or later which isn't going to happen when sessionExpirationTimeNanos is not set. While there is the Authenticator interface where comments suggest that #serverSessionExpirationTimeNanos should be left as null when re-authentication is "disabled". Does it make sense for reauth to be disabled? Or rather there are some clients or SASL mechanisms where we don't expect reauthentication to ever happen?
There was a problem hiding this comment.
While there is the Authenticator interface where comments suggest that #serverSessionExpirationTimeNanos should be left as null when re-authentication is "disabled"
Is this javadoc what you mean? From the javadoc, I don't see it said we should return null for re-auth disabled. I think it's OK to return the value when re-auth disabled.
here are some clients or SASL mechanisms where we don't expect reauthentication to ever happen?
Yes, it is. You can check this for reference.
Thanks.
showuon
left a comment
There was a problem hiding this comment.
Nice tests! Thank you. Left some comments.
Also, I found there are many tests failed with the error:
org.opentest4j.AssertionFailedError: Topic [__consumer_offsets] metadata not propagated after 60000 ms
ref: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12179/4/#showFailuresLink
I guess maybe it's because we use mock time, instead of system time now? Please help check them. Thank you.
There was a problem hiding this comment.
OK, for @SamBarker , I think we can discuss the property change in a separate thread since it's related to tests.
For this change:
if (connectionsMaxReauthMs != null) {
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs;
}
I understand why you did this @acsaki . It's because you think:
when reauth is disabled (when max reauth ms is NOT set), leave ReauthInfo#sessionExpirationTimeNanos as null but return millis until the token expires in SaslAuthenticateResponse's sessionLifetimeMs
I think it's correct, IF the sasl client did close the connection after the sessionLifetimeMs. But I don't think we should have this optimistic assumption for this "potential" security issue. I agree with @SamBarker about your original version of "removing the if condition" is a good fix.:
...
else
retvalSessionLifetimeMs = zeroIfNegative(Math.min(credentialExpirationMs - authenticationEndMs,
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs;
That is, no matter the reauth is enabled or not, we set the sessionExpirationTimeNanos, to inform the server, too. So that we can make sure when the session expired, either server or client will kill this connection. WDYT?
| when(selectionKey.attachment()).thenReturn(kafkaChannel); | ||
|
|
||
| selectionKey.attach(kafkaChannel); | ||
| Set<SelectionKey> selectionKeys = Utils.mkSet(selectionKey); | ||
| selector.pollSelectionKeys(selectionKeys, false, System.nanoTime()); | ||
|
|
||
| assertFalse(selector.connected().contains(kafkaChannel.id())); | ||
| assertTrue(selector.disconnected().containsKey(kafkaChannel.id())); | ||
| assertNull(selectionKey.attachment()); |
There was a problem hiding this comment.
Why we did this change? Did we change anything affect this test?
There was a problem hiding this comment.
This test failed after my change to use mockitoInline to mock statics. The selectionKey.attach line called the mock's method directly (pointless?) while it looked like the actual intention was selectionKey.attachment() to return the kafkaChannel. That's how it worked actually. Calling the mock method directly in the test and later asserting on attachment being null seemed confusing to me and the assert actually fails too. (with #attachment returning the kafkaChannel). Maybe this was sort of overbearing, is there a simpler way to fix the test?
There was a problem hiding this comment.
Thanks for the explanation. I agree with the change. Make sense.
| when(selectionKey.isReadable()).thenReturn(true); | ||
| when(selectionKey.readyOps()).thenReturn(SelectionKey.OP_READ); | ||
| selectionKey.attach(channel); | ||
| when(selectionKey.attachment()) | ||
| .thenReturn(channel) | ||
| .thenReturn(null); |
| testImplementation libs.bcpkix | ||
| testImplementation libs.junitJupiter | ||
| testImplementation libs.mockitoCore | ||
| testImplementation libs.mockitoInline |
There was a problem hiding this comment.
Could you explain why we need MockitoInline here? Any method only exists in mockitoInline?
There was a problem hiding this comment.
I needed this to mock out ChannelBuilders#createPrincipalBuilder and Sasl#createSaslServer. It's a petty there are these static calls, hard to test them.
There was a problem hiding this comment.
ChannelBuilders is our code, we should avoid mocking static methods.
There was a problem hiding this comment.
There was a problem hiding this comment.
bump, I second we should aim to remove this
Thank you, haven't thought of MockTime. Indeed, lot of tests are failing yet, I've found out that in some test |
|
@acsaki , is there any update for this PR? Do you need any help? |
…ails Reauthentication should be considered unset when connections.max.reauth.ms is at its default 0L value.
|
Thanks @showuon , I really like the idea that we should set Regarding whether max reauth is in play, I realized that a null check might not be not enough, we should probably treat the default 0L the same. (I think max reauth = 0 doesn't really make sense, does it?) |
| when(selectionKey.attachment()).thenReturn(kafkaChannel); | ||
|
|
||
| selectionKey.attach(kafkaChannel); | ||
| Set<SelectionKey> selectionKeys = Utils.mkSet(selectionKey); | ||
| selector.pollSelectionKeys(selectionKeys, false, System.nanoTime()); | ||
|
|
||
| assertFalse(selector.connected().contains(kafkaChannel.id())); | ||
| assertTrue(selector.disconnected().containsKey(kafkaChannel.id())); | ||
| assertNull(selectionKey.attachment()); |
There was a problem hiding this comment.
Thanks for the explanation. I agree with the change. Make sense.
| .getNegotiatedProperty(SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY); | ||
| Long connectionsMaxReauthMs = connectionsMaxReauthMsByMechanism.get(saslMechanism); | ||
| if (credentialExpirationMs != null || connectionsMaxReauthMs != null) { | ||
| boolean maxReauthSet = connectionsMaxReauthMs != null && connectionsMaxReauthMs > 0; |
There was a problem hiding this comment.
boolean maxReauthSet = connectionsMaxReauthMs != null && connectionsMaxReauthMs > 0;
Make sense to me.
There was a problem hiding this comment.
Thank you! Some tests are failing on my machine also but pass when I run them in the IDE. :( Might be some flakiness but I'm looking into it right now.
|
Failed tests are unrelated. |
|
@acsaki , thanks for your good finding and the fix! And thanks for @tombentley and @SamBarker 's review! |
|
Thank you @showuon, @tombentley and @SamBarker for the review, guidance and help! I'm really happy to see my first contribution merged, thank you! |
…-2022 * apache/trunk: (52 commits) KAFKA-13967: Document guarantees for producer callbacks on transaction commit (apache#12264) [KAFKA-13848] Clients remain connected after SASL re-authentication f… (apache#12179) KAFKA-10000: Zombie fencing logic (apache#11779) KAFKA-13947: Use %d formatting for integers rather than %s (apache#12267) KAFKA-13929: Replace legacy File.createNewFile() with NIO.2 Files.createFile() (apache#12197) KAFKA-13780: Generate OpenAPI file for Connect REST API (apache#12067) KAFKA-13917: Avoid calling lookupCoordinator() in tight loop (apache#12180) KAFKA-10199: Implement removing active and standby tasks from the state updater (apache#12270) MINOR: Update Scala to 2.13.8 in gradle.properties (apache#12273) MINOR: add java 8/scala 2.12 deprecation info in doc (apache#12261) ... Conflicts: gradle.properties
Clients remain connected and able to produce or consume despite an expired OAUTHBEARER token.
The problem can be reproduced using the https://github.com/acsaki/kafka-sasl-reauth project by starting the embedded OAuth2 server and Kafka, then running the long running consumer in OAuthBearerTest and then killing the OAuth2 server thus making the client unable to re-authenticate.
Root cause seems to be SaslServerAuthenticator#calcCompletionTimesAndReturnSessionLifetimeMs failing to set ReauthInfo#sessionExpirationTimeNanos when tokens have already expired (when session life time goes negative), in turn causing KafkaChannel#serverAuthenticationSessionExpired returning false and finally SocketServer not closing the channel.
The issue is observed with OAUTHBEARER but seems to have a wider impact on SASL re-authentication.