Skip to content

Commit

Permalink
[improve][admin] Check if the topic existed before the permission ope…
Browse files Browse the repository at this point in the history
…rations (#22547)
  • Loading branch information
Technoboy- committed Apr 26, 2024
1 parent ea584d0 commit adac20a
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync()
protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> internalCheckTopicExists(topicName))
.thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName));
}

Expand Down Expand Up @@ -258,9 +259,10 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
grantPermissionsAsync(topicName, role, actions)
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))))
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> internalCheckTopicExists(topicName))
.thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause);
Expand All @@ -272,6 +274,7 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse
protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> internalCheckTopicExists(topicName))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void testGetCreateDeleteSchema() throws Exception {
.serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString())
.authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN)
.build();
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume));
admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3698,4 +3698,16 @@ public void testRetentionAndBacklogQuotaCheck() throws PulsarAdminException {
});

}

@Test
@SneakyThrows
public void testPermissions() {
String namespace = "prop-xyz/ns1/";
final String random = UUID.randomUUID().toString();
final String topic = "persistent://" + namespace + random;
final String subject = UUID.randomUUID().toString();
assertThrows(NotFoundException.class, () -> admin.topics().getPermissions(topic));
assertThrows(NotFoundException.class, () -> admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce)));
assertThrows(NotFoundException.class, () -> admin.topics().revokePermissions(topic, subject));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -890,12 +890,15 @@ public void testGetList() throws Exception {
public void testGrantNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down Expand Up @@ -957,12 +960,15 @@ public void testGrantPartitionedTopic() {
public void testRevokeNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public AuthorizationTest() {
@Override
public void setup() throws Exception {
conf.setClusterName("c1");
conf.setSystemTopicEnabled(false);
conf.setForceDeleteNamespaceAllowed(true);
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(
Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider"));
Expand Down Expand Up @@ -96,8 +98,9 @@ public void simple() throws Exception {
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
EnumSet.of(AuthAction.consume));
String topic = "persistent://p1/c1/ns1/ds2";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().grantPermission(topic, "other-role", EnumSet.of(AuthAction.consume));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
Expand Down Expand Up @@ -167,8 +170,9 @@ public void simple() throws Exception {
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "my.role.1", null));
assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "my.role.2", null));

admin.topics().grantPermission("persistent://p1/c1/ns1/ds1", "my.*",
EnumSet.of(AuthAction.produce));
String topic1 = "persistent://p1/c1/ns1/ds1";
admin.topics().createNonPartitionedTopic(topic1);
admin.topics().grantPermission(topic1, "my.*", EnumSet.of(AuthAction.produce));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null));
Expand Down Expand Up @@ -231,7 +235,7 @@ public void simple() throws Exception {
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2"));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1"));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.namespaces().deleteNamespace("p1/c1/ns1", true);
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws
closeAdmin();
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build());
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
admin.topics().grantPermission("persistent://my-property/my-ns/my-topic", "anonymousUser",
String topic = "persistent://my-property/my-ns/my-topic";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().grantPermission(topic, "anonymousUser",
EnumSet.allOf(AuthAction.class));

// setup the client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public void testSubscriberPermission() throws Exception {
}

// grant topic consume authorization to the subscriptionRole
tenantAdmin.topics().createNonPartitionedTopic(topicName);
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));

Expand Down Expand Up @@ -773,6 +774,7 @@ public void testPermissionForProducerCreateInitialSubscription() throws Exceptio
admin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
admin.topics().createNonPartitionedTopic(topic);
admin.topics().grantPermission(topic, invalidRole, Collections.singleton(AuthAction.produce));
admin.topics().grantPermission(topic, producerRole, Sets.newHashSet(AuthAction.produce, AuthAction.consume));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public ProxyAuthorizationTest() {
@Override
protected void setup() throws Exception {
conf.setClusterName(configClusterName);
conf.setForceDeleteNamespaceAllowed(true);
internalSetup();

WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
Expand Down Expand Up @@ -99,8 +100,9 @@ public void test() throws Exception {
assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));

admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role",
EnumSet.of(AuthAction.consume));
String topic = "persistent://p1/c1/ns1/ds2";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().grantPermission(topic, "other-role", EnumSet.of(AuthAction.consume));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null));
Expand All @@ -117,7 +119,7 @@ public void test() throws Exception {
assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null));
assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null));

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.namespaces().deleteNamespace("p1/c1/ns1", true);
admin.tenants().deleteTenant("p1");
admin.clusters().deleteCluster("c1");
}
Expand Down

0 comments on commit adac20a

Please sign in to comment.