Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] [CORE-2845]: Ensure create_acls requests return an error when resources as topics are non valid kafka topic names #19791

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/v/kafka/server/handlers/details/security.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "kafka/protocol/schemata/delete_acls_request.h"
#include "kafka/protocol/schemata/describe_acls_request.h"
#include "kafka/server/request_context.h"
#include "model/validation.h"
#include "security/acl.h"
namespace kafka::details {

Expand Down Expand Up @@ -148,6 +149,17 @@ inline security::acl_binding to_acl_binding(const creatable_acl& acl) {
fmt::format("Invalid cluster name: {}", pattern.name()));
}

if (pattern.resource() == security::resource_type::topic) {
auto errc = model::validate_kafka_topic_name(
model::topic_view(pattern.name()));
if (pattern.name() != "*" && errc) {
throw acl_conversion_error(fmt::format(
"ACL topic {} does not conform to kafka topic schema: {}",
pattern.name(),
errc.message()));
}
}

security::acl_entry entry(
to_acl_principal(acl.principal),
to_acl_host(acl.host),
Expand Down
27 changes: 18 additions & 9 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,15 @@ def _check_stdout_success(self, output):
if not status_line.endswith("OK"):
raise RpkException(f"Bad status: '{status_line}'")

def sasl_allow_principal(self,
principal,
operations,
resource,
resource_name,
username: Optional[str] = None,
password: Optional[str] = None,
mechanism: Optional[str] = None):
def _sasl_set_principal_access(self,
principal,
operations,
resource,
resource_name,
username: Optional[str] = None,
password: Optional[str] = None,
mechanism: Optional[str] = None,
deny=False):

username = username if username is not None else self._username
password = password if password is not None else self._password
Expand All @@ -377,14 +378,22 @@ def sasl_allow_principal(self,
else:
raise Exception(f"unknown resource: {resource}")

perm = '--allow-principal' if not deny else '--deny-principal'

cmd = [
"acl", "create", "--allow-principal", principal, "--operation",
"acl", "create", perm, principal, "--operation",
",".join(operations), resource, resource_name, "--brokers",
self._redpanda.brokers(), "--user", username, "--password",
password, "--sasl-mechanism", mechanism
] + self._tls_settings()
return self._run(cmd)

def sasl_allow_principal(self, *args, **kwargs):
return self._sasl_set_principal_access(*args, **kwargs, deny=False)

def sasl_deny_principal(self, *args, **kwargs):
return self._sasl_set_principal_access(*args, **kwargs, deny=True)

def allow_principal(self, principal, operations, resource, resource_name):
if resource == "topic":
resource = "--topic"
Expand Down
34 changes: 33 additions & 1 deletion tests/rptest/tests/acls_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster
from rptest.services.admin import Admin
from rptest.clients.rpk import RpkTool, ClusterAuthorizationError, RpkException
from rptest.clients.rpk import RpkTool, ClusterAuthorizationError, RpkException, AclList
from rptest.services.redpanda import SecurityConfig, TLSProvider
from rptest.services.redpanda_installer import RedpandaInstaller, wait_for_num_versions
from rptest.services import tls
Expand Down Expand Up @@ -245,6 +245,38 @@ def check_super_user_perms():
timeout_sec=timeout_sec,
err_msg=f'super user: {err_msg}')

@cluster(num_nodes=3)
def test_invalid_acl_topic_name(self):
self.prepare_cluster(use_sasl=True, use_tls=False, authn_method=None)

# Ensure creating an ACL topic resource with a valid kafka topic name works
client = self.get_super_client()
resource = 'my_topic'
results = AclList.parse_raw(
client.sasl_allow_principal(principal='base',
operations=['all'],
resource='topic',
resource_name=resource))
self.redpanda.logger.info(f'{results._acls}')
assert results.has_permission(
'base', 'all', 'topic',
resource), f'Failed to create_acl for resource {resource}'

# Assert that appropriate error was returned by the server for invalid
# kafka topic names
resource = 'my bad topic name'
results = AclList.parse_raw(
client.sasl_allow_principal(principal='base',
operations=['all'],
resource='topic',
resource_name=resource))
acls = results._acls['base']
assert acls is not None, "Missing principal from create_acls result"

acl = [acl for acl in acls if acl.resource_name == resource]
assert len(acl) == 1, f'Expected match for {resource} not found'
assert acl[0].error == 'INVALID_REQUEST'

'''
The old config style has use_sasl at the top level, which enables
authorization. New config style has kafka_enable_authorization at the
Expand Down
Loading