diff --git a/src/v/security/audit/audit_log_manager.cc b/src/v/security/audit/audit_log_manager.cc index 80b87e0913b5..fd1c696df5c6 100644 --- a/src/v/security/audit/audit_log_manager.cc +++ b/src/v/security/audit/audit_log_manager.cc @@ -739,6 +739,10 @@ audit_log_manager::audit_log_manager( , _controller(controller) , _config(client_config) , _metadata_cache(metadata_cache) { + _probe.setup_metrics([this] { + return 1.0 + - (static_cast(_queue_bytes_sem.available_units()) / static_cast(_max_queue_size_bytes)); + }); if (ss::this_shard_id() == client_shard_id) { _sink = std::make_unique(this, controller, client_config); } @@ -808,11 +812,6 @@ ss::future<> audit_log_manager::start() { "Redpanda is operating in recovery mode. Auditing is disabled!"); co_return; } - _probe = std::make_unique(); - _probe->setup_metrics([this] { - return 1.0 - - (static_cast(_queue_bytes_sem.available_units()) / static_cast(_max_queue_size_bytes)); - }); if (ss::this_shard_id() != client_shard_id) { co_return; } @@ -846,7 +845,6 @@ ss::future<> audit_log_manager::stop() { /// Gate may already be closed if ::pause() had been called co_await _gate.close(); } - _probe.reset(nullptr); if (_queue.size() > 0) { vlog( adtlog.debug, @@ -1006,7 +1004,7 @@ audit_log_manager::should_enqueue_audit_event() const { adtlog.warn, "Audit message passthrough active until cluster has been fully " "upgraded to the min supported version for audit_logging"); - _probe->audit_error(); + _probe.audit_error(); return std::make_optional(audit_event_passthrough::yes); } if (_auth_misconfigured) { diff --git a/src/v/security/audit/audit_log_manager.h b/src/v/security/audit/audit_log_manager.h index d9caa8224e54..06be3a1a0098 100644 --- a/src/v/security/audit/audit_log_manager.h +++ b/src/v/security/audit/audit_log_manager.h @@ -283,7 +283,7 @@ class audit_log_manager return true; } - audit_probe& probe() { return *_probe; } + audit_probe& probe() { return _probe; } template auto restrict_topics(Func&& func) const noexcept { @@ -401,6 +401,10 @@ class audit_log_manager underlying_t _queue; ssx::semaphore _active_drain{1, "audit-drain"}; + // Probe is mutable so it can be modified in const methods when they need to + // report auditing failures + mutable audit_probe _probe; + /// Single instance contains a kafka::client::client instance. friend class audit_sink; std::unique_ptr _sink; @@ -409,7 +413,6 @@ class audit_log_manager model::node_id _self; cluster::controller* _controller; kafka::client::configuration& _config; - std::unique_ptr _probe; ss::sharded* _metadata_cache; };