Skip to content

Commit e4f1f09

Browse files
[improve][ws] Use allowTopicOperationAsync for authz checks (#20299)
Similar to: #20142 ### Motivation In #20142 we changed the `Consumer` and the `Producer` logic to call the correct `AuthorizationService` method. Our goal is to deprecate the `AuthorizationService` methods for `canProduce` and `canConsume`, so this change helps us move in the right direction. This PR follows the same logic and updates the WebSocket proxy to remove all calls to the `can*` methods in the `AuthorizationService` ### Modifications * Update `ProducerHandler`, `ConsumerHandler`, and `ReaderHander` in the WebSocket Proxy to call the `AuthorizationService#allowTopicOperationAsync` method. ### Verifying this change This change is trivial. ### Documentation - [x] `doc-not-needed` ### Matching PR in forked repository PR in forked repository: Skipping PR as I ran tests locally.
1 parent 908d0b3 commit e4f1f09

File tree

3 files changed

+51
-5
lines changed

3 files changed

+51
-5
lines changed

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.websocket;
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
22+
import static java.util.concurrent.TimeUnit.SECONDS;
2223
import com.fasterxml.jackson.core.JsonProcessingException;
2324
import com.google.common.base.Enums;
2425
import com.google.common.base.Splitter;
@@ -33,6 +34,7 @@
3334
import java.util.concurrent.atomic.LongAdder;
3435
import javax.servlet.http.HttpServletRequest;
3536
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
37+
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
3638
import org.apache.pulsar.client.api.Consumer;
3739
import org.apache.pulsar.client.api.ConsumerBuilder;
3840
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -47,6 +49,7 @@
4749
import org.apache.pulsar.client.api.TopicMessageId;
4850
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
4951
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
52+
import org.apache.pulsar.common.policies.data.TopicOperation;
5053
import org.apache.pulsar.common.util.Codec;
5154
import org.apache.pulsar.common.util.DateFormatter;
5255
import org.apache.pulsar.websocket.data.ConsumerCommand;
@@ -467,8 +470,21 @@ protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client)
467470

468471
@Override
469472
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
470-
return service.getAuthorizationService().canConsume(topic, authRole, authenticationData,
471-
this.subscription);
473+
try {
474+
AuthenticationDataSubscription subscription = new AuthenticationDataSubscription(authenticationData,
475+
this.subscription);
476+
return service.getAuthorizationService()
477+
.allowTopicOperationAsync(topic, TopicOperation.CONSUME, authRole, subscription)
478+
.get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
479+
} catch (InterruptedException e) {
480+
log.warn("Time-out {} sec while checking authorization on {} ",
481+
service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic);
482+
throw e;
483+
} catch (Exception e) {
484+
log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic,
485+
e.getMessage());
486+
throw e;
487+
}
472488
}
473489

474490
public static String extractSubscription(HttpServletRequest request) {

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
2222
import static java.lang.String.format;
23+
import static java.util.concurrent.TimeUnit.SECONDS;
2324
import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
2425
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
2526
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
@@ -45,6 +46,7 @@
4546
import org.apache.pulsar.client.api.PulsarClient;
4647
import org.apache.pulsar.client.api.SchemaSerializationException;
4748
import org.apache.pulsar.client.api.TypedMessageBuilder;
49+
import org.apache.pulsar.common.policies.data.TopicOperation;
4850
import org.apache.pulsar.common.util.DateFormatter;
4951
import org.apache.pulsar.common.util.ObjectMapperFactory;
5052
import org.apache.pulsar.websocket.data.ProducerAck;
@@ -242,7 +244,19 @@ public long getMsgPublishedCounter() {
242244

243245
@Override
244246
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
245-
return service.getAuthorizationService().canProduce(topic, authRole, authenticationData);
247+
try {
248+
return service.getAuthorizationService()
249+
.allowTopicOperationAsync(topic, TopicOperation.PRODUCE, authRole, authenticationData)
250+
.get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
251+
} catch (InterruptedException e) {
252+
log.warn("Time-out {} sec while checking authorization on {} ",
253+
service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic);
254+
throw e;
255+
} catch (Exception e) {
256+
log.warn("Producer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic,
257+
e.getMessage());
258+
throw e;
259+
}
246260
}
247261

248262
private void sendAckResponse(ProducerAck response) {

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.websocket;
2020

21+
import static java.util.concurrent.TimeUnit.SECONDS;
2122
import static org.apache.commons.lang3.StringUtils.isNotBlank;
2223
import com.fasterxml.jackson.core.JsonProcessingException;
2324
import java.io.IOException;
@@ -28,6 +29,7 @@
2829
import javax.servlet.http.HttpServletRequest;
2930
import javax.servlet.http.HttpServletResponse;
3031
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
32+
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
3133
import org.apache.pulsar.client.api.Consumer;
3234
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
3335
import org.apache.pulsar.client.api.MessageId;
@@ -38,6 +40,7 @@
3840
import org.apache.pulsar.client.impl.MessageIdImpl;
3941
import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
4042
import org.apache.pulsar.client.impl.ReaderImpl;
43+
import org.apache.pulsar.common.policies.data.TopicOperation;
4144
import org.apache.pulsar.common.util.DateFormatter;
4245
import org.apache.pulsar.websocket.data.ConsumerCommand;
4346
import org.apache.pulsar.websocket.data.ConsumerMessage;
@@ -310,8 +313,21 @@ protected void updateDeliverMsgStat(long msgSize) {
310313

311314
@Override
312315
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
313-
return service.getAuthorizationService().canConsume(topic, authRole, authenticationData,
314-
this.subscription);
316+
try {
317+
AuthenticationDataSubscription subscription = new AuthenticationDataSubscription(authenticationData,
318+
this.subscription);
319+
return service.getAuthorizationService()
320+
.allowTopicOperationAsync(topic, TopicOperation.CONSUME, authRole, subscription)
321+
.get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
322+
} catch (InterruptedException e) {
323+
log.warn("Time-out {} sec while checking authorization on {} ",
324+
service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic);
325+
throw e;
326+
} catch (Exception e) {
327+
log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic,
328+
e.getMessage());
329+
throw e;
330+
}
315331
}
316332

317333
private int getReceiverQueueSize() {

0 commit comments

Comments
 (0)