Skip to content

Commit

Permalink
rptest/tests: Test bad acl kafka topic resource
Browse files Browse the repository at this point in the history
(cherry picked from commit 389e3b5)
  • Loading branch information
Rob Blafford authored and oleiman committed Jun 11, 2024
1 parent a662e1f commit a3fa6a9
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion tests/rptest/tests/acls_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster
from rptest.services.admin import Admin
from rptest.clients.rpk import RpkTool, ClusterAuthorizationError, RpkException
from rptest.clients.rpk import RpkTool, ClusterAuthorizationError, RpkException, AclList
from rptest.services.redpanda import SecurityConfig, TLSProvider
from rptest.services.redpanda_installer import RedpandaInstaller, wait_for_num_versions
from rptest.services import tls
Expand Down Expand Up @@ -245,6 +245,38 @@ def check_super_user_perms():
timeout_sec=timeout_sec,
err_msg=f'super user: {err_msg}')

@cluster(num_nodes=3)
def test_invalid_acl_topic_name(self):
self.prepare_cluster(use_sasl=True, use_tls=False, authn_method=None)

# Ensure creating an ACL topic resource with a valid kafka topic name works
client = self.get_super_client()
resource = 'my_topic'
results = AclList.parse_raw(
client.sasl_allow_principal(principal='base',
operations=['all'],
resource='topic',
resource_name=resource))
self.redpanda.logger.info(f'{results._acls}')
assert results.has_permission(
'base', 'all', 'topic',
resource), f'Failed to create_acl for resource {resource}'

# Assert that appropriate error was returned by the server for invalid
# kafka topic names
resource = 'my bad topic name'
results = AclList.parse_raw(
client.sasl_allow_principal(principal='base',
operations=['all'],
resource='topic',
resource_name=resource))
acls = results._acls['base']
assert acls is not None, "Missing principal from create_acls result"

acl = [acl for acl in acls if acl.resource_name == resource]
assert len(acl) == 1, f'Expected match for {resource} not found'
assert acl[0].error == 'INVALID_REQUEST'

'''
The old config style has use_sasl at the top level, which enables
authorization. New config style has kafka_enable_authorization at the
Expand Down

0 comments on commit a3fa6a9

Please sign in to comment.