Skip to content

Commit

Permalink
Merge pull request #19791 from oleiman/vbotbuildovich/backport-18701-…
Browse files Browse the repository at this point in the history
…v23.3.x-200

[v23.3.x] [CORE-2845]: Ensure create_acls requests return an error when resources as topics are non valid kafka topic names
  • Loading branch information
michael-redpanda authored Jun 12, 2024
2 parents 9e0fd39 + a3fa6a9 commit a5260d3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
12 changes: 12 additions & 0 deletions src/v/kafka/server/handlers/details/security.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "kafka/protocol/schemata/delete_acls_request.h"
#include "kafka/protocol/schemata/describe_acls_request.h"
#include "kafka/server/request_context.h"
#include "model/validation.h"
#include "security/acl.h"
namespace kafka::details {

Expand Down Expand Up @@ -148,6 +149,17 @@ inline security::acl_binding to_acl_binding(const creatable_acl& acl) {
fmt::format("Invalid cluster name: {}", pattern.name()));
}

if (pattern.resource() == security::resource_type::topic) {
auto errc = model::validate_kafka_topic_name(
model::topic_view(pattern.name()));
if (pattern.name() != "*" && errc) {
throw acl_conversion_error(fmt::format(
"ACL topic {} does not conform to kafka topic schema: {}",
pattern.name(),
errc.message()));
}
}

security::acl_entry entry(
to_acl_principal(acl.principal),
to_acl_host(acl.host),
Expand Down
27 changes: 18 additions & 9 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,15 @@ def _check_stdout_success(self, output):
if not status_line.endswith("OK"):
raise RpkException(f"Bad status: '{status_line}'")

def sasl_allow_principal(self,
principal,
operations,
resource,
resource_name,
username: Optional[str] = None,
password: Optional[str] = None,
mechanism: Optional[str] = None):
def _sasl_set_principal_access(self,
principal,
operations,
resource,
resource_name,
username: Optional[str] = None,
password: Optional[str] = None,
mechanism: Optional[str] = None,
deny=False):

username = username if username is not None else self._username
password = password if password is not None else self._password
Expand All @@ -377,14 +378,22 @@ def sasl_allow_principal(self,
else:
raise Exception(f"unknown resource: {resource}")

perm = '--allow-principal' if not deny else '--deny-principal'

cmd = [
"acl", "create", "--allow-principal", principal, "--operation",
"acl", "create", perm, principal, "--operation",
",".join(operations), resource, resource_name, "--brokers",
self._redpanda.brokers(), "--user", username, "--password",
password, "--sasl-mechanism", mechanism
] + self._tls_settings()
return self._run(cmd)

def sasl_allow_principal(self, *args, **kwargs):
return self._sasl_set_principal_access(*args, **kwargs, deny=False)

def sasl_deny_principal(self, *args, **kwargs):
return self._sasl_set_principal_access(*args, **kwargs, deny=True)

def allow_principal(self, principal, operations, resource, resource_name):
if resource == "topic":
resource = "--topic"
Expand Down
34 changes: 33 additions & 1 deletion tests/rptest/tests/acls_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster
from rptest.services.admin import Admin
from rptest.clients.rpk import RpkTool, ClusterAuthorizationError, RpkException
from rptest.clients.rpk import RpkTool, ClusterAuthorizationError, RpkException, AclList
from rptest.services.redpanda import SecurityConfig, TLSProvider
from rptest.services.redpanda_installer import RedpandaInstaller, wait_for_num_versions
from rptest.services import tls
Expand Down Expand Up @@ -245,6 +245,38 @@ def check_super_user_perms():
timeout_sec=timeout_sec,
err_msg=f'super user: {err_msg}')

@cluster(num_nodes=3)
def test_invalid_acl_topic_name(self):
self.prepare_cluster(use_sasl=True, use_tls=False, authn_method=None)

# Ensure creating an ACL topic resource with a valid kafka topic name works
client = self.get_super_client()
resource = 'my_topic'
results = AclList.parse_raw(
client.sasl_allow_principal(principal='base',
operations=['all'],
resource='topic',
resource_name=resource))
self.redpanda.logger.info(f'{results._acls}')
assert results.has_permission(
'base', 'all', 'topic',
resource), f'Failed to create_acl for resource {resource}'

# Assert that appropriate error was returned by the server for invalid
# kafka topic names
resource = 'my bad topic name'
results = AclList.parse_raw(
client.sasl_allow_principal(principal='base',
operations=['all'],
resource='topic',
resource_name=resource))
acls = results._acls['base']
assert acls is not None, "Missing principal from create_acls result"

acl = [acl for acl in acls if acl.resource_name == resource]
assert len(acl) == 1, f'Expected match for {resource} not found'
assert acl[0].error == 'INVALID_REQUEST'

'''
The old config style has use_sasl at the top level, which enables
authorization. New config style has kafka_enable_authorization at the
Expand Down

0 comments on commit a5260d3

Please sign in to comment.