diff --git a/src/v/security/audit/audit_log_manager.cc b/src/v/security/audit/audit_log_manager.cc index fd1c696df5c66..c84f3ebc5a93d 100644 --- a/src/v/security/audit/audit_log_manager.cc +++ b/src/v/security/audit/audit_log_manager.cc @@ -522,8 +522,10 @@ ss::future<> audit_client::produce( // here should usually be 1-2, since the default per-shard queue limit // is 1MiB, which is also the default for kafka_batch_max_bytes. // TODO(oren): a configurabale ratio might be better - [[maybe_unused]] auto max_concurrency - = _max_buffer_size / config::shard_local_cfg().kafka_batch_max_bytes(); + auto max_concurrency = std::clamp( + _max_buffer_size / config::shard_local_cfg().kafka_batch_max_bytes(), + 1, + records.size()); try { ssx::spawn_with_gate( diff --git a/tests/rptest/tests/audit_log_test.py b/tests/rptest/tests/audit_log_test.py index 04682c37b1d72..00d6c3dcdc8f1 100644 --- a/tests/rptest/tests/audit_log_test.py +++ b/tests/rptest/tests/audit_log_test.py @@ -2211,3 +2211,43 @@ def test_sr_audit_bad_authz(self): lambda record: self.match_api_record(record, "mode", StatusID. FAILURE), lambda aggregate_count: aggregate_count >= 1, 'API call') + + +class AuditLogTestReproducer(AuditLogTestBase): + """Reproducer and regression test for a bug in the audit logging client where having kafka_batch_max_bytes > audit_client_max_buffer_size lead to no audit messages being produced and the audit log buffers filling up.""" + def __init__(self, test_context): + + super(AuditLogTestReproducer, self).__init__( + test_context=test_context, + audit_log_config=AuditLogConfig(num_partitions=1, + event_types=['management']), + extra_rp_conf={ + "kafka_batch_max_bytes": "26214400", + "audit_client_max_buffer_size": "16777216", + }, + log_config=LoggingConfig('info', + logger_levels={ + 'auditing': 'trace', + 'kafka': 'trace', + 'kafka/client': 'trace', + 'admin_api_server': 'trace', + })) + + @skip_fips_mode + @cluster(num_nodes=5) + def test_sanctioning_mode(self): + self.redpanda.logger.debug("Triggering an audit log event") + created_topic = "created_topic" + self.super_rpk.create_topic(topic=created_topic) + + def matches_topic_creation(record): + return record['class_uid'] == 6003 \ + and record['api']['service']['name'] == self.kafka_rpc_service_name \ + and {'name': created_topic, 'type': 'topic'} in record['resources'] + + records = self.find_matching_record( + matches_topic_creation, lambda record_count: record_count >= 1, + "Expected to observe a management API event for the topic creation" + ) + assert len(records) > 0, \ + f'Did not receive any audit records for topic {created_topic}'