-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 10816][Proxy] Refresh client auth token #13339
Conversation
@KKKoder I think you do too much checkstyle format in this PR. I would suggest to split them if needed |
@KKKoder let me open a PR for checkstyle fix |
Ok, do it. |
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
Outdated
Show resolved
Hide resolved
The pr had no activity for 30 days, mark with Stale label. |
.forEach(future -> future.complete(clientData)); | ||
} | ||
|
||
private CompletableFuture<AuthData> getOrRefreshClientAuthData() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you walk me through what this method suppose to do? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When client auth data expired and proxy configured with forward authentication data, then this method requests a new token. This method is thread safe, because one ProxyConnection has multiple ProxyClientCnx, and potentially at same time, they request
auth data.
// mutual authentication is to auth between `remoteHostName` and this client for this channel. | ||
// each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data, | ||
// and return authData to server. | ||
authenticationDataProvider = authentication.getAuthData(remoteHostName); | ||
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); | ||
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, | ||
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null); | ||
return CompletableFuture.completedFuture( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point of using CompletableFuture here ? It seems there is no async operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is overridden in the ProxyClientCnx class, which can use the asynchronous operation of obtaining authorization data from the client, so this method returns a CompletableFuture
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done some changes, check them please.
// mutual authentication is to auth between `remoteHostName` and this client for this channel. | ||
// each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data, | ||
// and return authData to server. | ||
authenticationDataProvider = authentication.getAuthData(remoteHostName); | ||
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); | ||
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, | ||
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null); | ||
return CompletableFuture.completedFuture( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
try { | ||
clientAuthDataSupplier.get() | ||
.thenAccept(authData -> sendAuthResponse(authData, clientAuthMethod)); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is it possible to have an Exception here ?
in case of failure we should deal with the error and send a response or close the connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already fixed it, make a review please.
@Test | ||
void testRefreshClientToken() throws Exception { | ||
log.info("-- Starting {} test --", methodName); | ||
|
||
startProxy(); | ||
createAdminClient(); | ||
|
||
@SuppressWarnings("unchecked") | ||
Supplier<String> tokenSupplier = Mockito.mock(Supplier.class); | ||
when(tokenSupplier.get()).thenAnswer(answer -> createClientJwtToken(Duration.ofSeconds(1))); | ||
|
||
PulsarClient proxyClient = PulsarClient.builder() | ||
.serviceUrl(proxyService.getServiceUrl()).statsInterval(0, TimeUnit.SECONDS) | ||
.authentication(AuthenticationFactory.token(tokenSupplier)) | ||
.operationTimeout(1000, TimeUnit.MILLISECONDS) | ||
.build(); | ||
|
||
String namespaceName = "my-property/proxy-authorization/my-ns"; | ||
admin.clusters().createCluster("proxy-authorization", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); | ||
admin.tenants().createTenant("my-property", | ||
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization"))); | ||
admin.namespaces().createNamespace(namespaceName); | ||
|
||
admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE, | ||
Sets.newHashSet(AuthAction.consume, AuthAction.produce)); | ||
log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName)); | ||
|
||
Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES) | ||
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create(); | ||
|
||
final int msgs = 10; | ||
for (int i = 0; i < msgs; i++) { | ||
String message = "my-message-" + i; | ||
producer.send(message.getBytes()); | ||
} | ||
|
||
//noinspection unchecked | ||
clearInvocations(tokenSupplier); | ||
Thread.sleep(3000); | ||
verify(tokenSupplier, atLeastOnce()).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test is correct. It is passing for us, even without the associated fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. thanks. I'll check and fix the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made another test, check please.
The pr had no activity for 30 days, mark with Stale label. |
Hi @kkoderok, thanks for your contribution! Can you still work on this PR? If you don't have time, I will work on this PR. |
Closed by #17831. |
Fixes #10816
Motivation
See #10816
Modifications
Refresh client token on proxy (to lookup) in case when refresh token command have been received from broker.
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
no-need-doc
Bug fix