Skip to content

Commit

Permalink
Merge pull request #24148 from pgellert/manual-backport-24137-v24.2.x…
Browse files Browse the repository at this point in the history
…-101

[v24.2.x] audit: clamp audit client max parallelism
  • Loading branch information
michael-redpanda authored Nov 16, 2024
2 parents 4a9144b + bc6ee9d commit 02513ee
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/v/security/audit/audit_log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,10 @@ ss::future<> audit_client::produce(
// here should usually be 1-2, since the default per-shard queue limit
// is 1MiB, which is also the default for kafka_batch_max_bytes.
// TODO(oren): a configurabale ratio might be better
[[maybe_unused]] auto max_concurrency
= _max_buffer_size / config::shard_local_cfg().kafka_batch_max_bytes();
auto max_concurrency = std::clamp<size_t>(
_max_buffer_size / config::shard_local_cfg().kafka_batch_max_bytes(),
1,
records.size());

try {
ssx::spawn_with_gate(
Expand Down
40 changes: 40 additions & 0 deletions tests/rptest/tests/audit_log_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2211,3 +2211,43 @@ def test_sr_audit_bad_authz(self):
lambda record: self.match_api_record(record, "mode", StatusID.
FAILURE),
lambda aggregate_count: aggregate_count >= 1, 'API call')


class AuditLogTestReproducer(AuditLogTestBase):
"""Reproducer and regression test for a bug in the audit logging client where having kafka_batch_max_bytes > audit_client_max_buffer_size lead to no audit messages being produced and the audit log buffers filling up."""
def __init__(self, test_context):

super(AuditLogTestReproducer, self).__init__(
test_context=test_context,
audit_log_config=AuditLogConfig(num_partitions=1,
event_types=['management']),
extra_rp_conf={
"kafka_batch_max_bytes": "26214400",
"audit_client_max_buffer_size": "16777216",
},
log_config=LoggingConfig('info',
logger_levels={
'auditing': 'trace',
'kafka': 'trace',
'kafka/client': 'trace',
'admin_api_server': 'trace',
}))

@skip_fips_mode
@cluster(num_nodes=5)
def test_sanctioning_mode(self):
self.redpanda.logger.debug("Triggering an audit log event")
created_topic = "created_topic"
self.super_rpk.create_topic(topic=created_topic)

def matches_topic_creation(record):
return record['class_uid'] == 6003 \
and record['api']['service']['name'] == self.kafka_rpc_service_name \
and {'name': created_topic, 'type': 'topic'} in record['resources']

records = self.find_matching_record(
matches_topic_creation, lambda record_count: record_count >= 1,
"Expected to observe a management API event for the topic creation"
)
assert len(records) > 0, \
f'Did not receive any audit records for topic {created_topic}'

0 comments on commit 02513ee

Please sign in to comment.