diff --git a/falco.yaml b/falco.yaml index 2a9769cf325..c235c3cbeba 100644 --- a/falco.yaml +++ b/falco.yaml @@ -39,6 +39,8 @@ # json_include_tags_property # buffered_outputs # outputs (throttling) +# rule_matching +# outputs_queue # Falco outputs channels # stdout_output # syslog_output @@ -304,6 +306,34 @@ outputs: # defined. rule_matching: first +# [Experimental] `outputs_queue` +# +# Falco utilizes tbb::concurrent_bounded_queue for handling outputs, and this parameter +# allows you to customize the queue capacity. Please refer to the official documentation: +# https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html. +# On a healthy system with optimized Falco rules, the queue should not fill up. +# If it does, it is most likely happening due to the entire event flow being too slow, +# indicating that the server is under heavy load. +# +# Lowering the number of items can prevent memory from steadily increasing until the OOM +# killer stops the Falco process. We provide recovery actions to self-limit or self-kill +# in order to handle this situation earlier, similar to how we expose the kernel buffer size +# as a parameter. However, it will not address the root cause of the event pipe not keeping up. +# +# `capacity`: the maximum number of items allowed in the queue is determined by this value. +# Setting the value to 0 (which is the default) is equivalent to keeping the queue unbounded. +# In other words, when this configuration is set to 0, the number of allowed items is effectively +# set to the largest possible long value, disabling this setting. +# +# `recovery`: strategy to follow when the queue becomes filled up. It applies only when the +# queue is bounded and there is still available system memory. In the case of an unbounded +# queue, if the available memory on the system is consumed, the Falco process would be +# OOM killed. The value `exit` is the default, `continue` does nothing special and `empty` +# empties the queue and then continues. +outputs_queue: + capacity: 0 + recovery: exit + ########################## # Falco outputs channels # diff --git a/userspace/engine/falco_common.cpp b/userspace/engine/falco_common.cpp index 44b7a489ecf..9cf27d636b3 100644 --- a/userspace/engine/falco_common.cpp +++ b/userspace/engine/falco_common.cpp @@ -32,6 +32,12 @@ static std::vector rule_matching_names = { "all" }; +static std::vector outputs_queue_recovery_names = { + "continue", + "exit", + "empty", +}; + bool falco_common::parse_priority(std::string v, priority_type& out) { for (size_t i = 0; i < priority_names.size(); i++) @@ -59,6 +65,19 @@ falco_common::priority_type falco_common::parse_priority(std::string v) return out; } +bool falco_common::parse_queue_recovery(std::string v, outputs_queue_recovery_type& out) +{ + for (size_t i = 0; i < outputs_queue_recovery_names.size(); i++) + { + if (!strcasecmp(v.c_str(), outputs_queue_recovery_names[i].c_str())) + { + out = (outputs_queue_recovery_type) i; + return true; + } + } + return false; +} + bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt) { if ((size_t) v < priority_names.size()) diff --git a/userspace/engine/falco_common.h b/userspace/engine/falco_common.h index 6c63eec9d42..acc1030eb60 100644 --- a/userspace/engine/falco_common.h +++ b/userspace/engine/falco_common.h @@ -21,6 +21,12 @@ limitations under the License. #include #include +// +// equivalent to an "unbounded queue" in TBB terms or largest long value +// https://github.com/oneapi-src/oneTBB/blob/b2474bfc636937052d05daf8b3f4d6b76e20273a/include/oneapi/tbb/concurrent_queue.h#L554 +// +#define DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE std::ptrdiff_t(~size_t(0) / 2) + // // Most falco_* classes can throw exceptions. Unless directly related // to low-level failures like inability to open file, etc, they will @@ -52,6 +58,13 @@ struct falco_exception : std::exception namespace falco_common { + + enum outputs_queue_recovery_type { + RECOVERY_CONTINUE = 0, /* outputs_queue_capacity recovery strategy of continuing on. */ + RECOVERY_EXIT = 1, /* outputs_queue_capacity recovery strategy of exiting, self OOM kill. */ + RECOVERY_EMPTY = 2, /* outputs_queue_capacity recovery strategy of emptying queue then continuing. */ + }; + const std::string syscall_source = sinsp_syscall_event_source_name; // Same as numbers/indices into the above vector @@ -69,6 +82,7 @@ namespace falco_common bool parse_priority(std::string v, priority_type& out); priority_type parse_priority(std::string v); + bool parse_queue_recovery(std::string v, outputs_queue_recovery_type& out); bool format_priority(priority_type v, std::string& out, bool shortfmt=false); std::string format_priority(priority_type v, bool shortfmt=false); diff --git a/userspace/falco/app/actions/init_outputs.cpp b/userspace/falco/app/actions/init_outputs.cpp index aebefe44da6..7fd32d52b83 100644 --- a/userspace/falco/app/actions/init_outputs.cpp +++ b/userspace/falco/app/actions/init_outputs.cpp @@ -63,6 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s) s.config->m_json_include_tags_property, s.config->m_output_timeout, s.config->m_buffered_outputs, + s.config->m_outputs_queue_capacity, + s.config->m_outputs_queue_recovery, s.config->m_time_format_iso_8601, hostname)); diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index a059d321cbf..1f09e582b26 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -40,6 +40,8 @@ falco_configuration::falco_configuration(): m_watch_config_files(true), m_rule_matching(falco_common::rule_matching::FIRST), m_buffered_outputs(false), + m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE), + m_outputs_queue_recovery(falco_common::RECOVERY_EXIT), m_time_format_iso_8601(false), m_output_timeout(2000), m_grpc_enabled(false), @@ -258,6 +260,18 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h } m_buffered_outputs = config.get_scalar("buffered_outputs", false); + m_outputs_queue_capacity = config.get_scalar("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE); + // We use 0 in falco.yaml to indicate an unbounded queue; equivalent to the largest long value + if (m_outputs_queue_capacity == 0) + { + m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE; + } + std::string recovery = config.get_scalar("outputs_queue.recovery", "exit"); + if (!falco_common::parse_queue_recovery(recovery, m_outputs_queue_recovery)) + { + throw std::logic_error("Unknown recovery \"" + recovery + "\"--must be one of exit, continue, empty"); + } + m_time_format_iso_8601 = config.get_scalar("time_format_iso_8601", false); falco_logger::log_stderr = config.get_scalar("log_stderr", false); diff --git a/userspace/falco/configuration.h b/userspace/falco/configuration.h index 1b3bf7f1a3e..c1a595d0a96 100644 --- a/userspace/falco/configuration.h +++ b/userspace/falco/configuration.h @@ -72,6 +72,8 @@ class falco_configuration bool m_watch_config_files; bool m_buffered_outputs; + size_t m_outputs_queue_capacity; + falco_common::outputs_queue_recovery_type m_outputs_queue_recovery; bool m_time_format_iso_8601; uint32_t m_output_timeout; diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index a8353546c69..9425e5ad2bb 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -19,7 +19,6 @@ limitations under the License. #endif #include "falco_outputs.h" - #include "config_falco.h" #include "formats.h" @@ -47,6 +46,8 @@ falco_outputs::falco_outputs( bool json_include_tags_property, uint32_t timeout, bool buffered, + size_t outputs_queue_capacity, + falco_common::outputs_queue_recovery_type outputs_queue_recovery, bool time_format_iso_8601, const std::string& hostname) { @@ -64,7 +65,10 @@ falco_outputs::falco_outputs( { add_output(output); } + m_outputs_queue_num_drops = {0}; + m_outputs_queue_recovery = outputs_queue_recovery; #ifndef __EMSCRIPTEN__ + m_queue.set_capacity(outputs_queue_capacity); m_worker_thread = std::thread(&falco_outputs::worker, this); #endif } @@ -274,8 +278,25 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) #ifndef __EMSCRIPTEN__ if (!m_queue.try_push(cmsg)) { - fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n"); - exit(EXIT_FAILURE); + switch (m_outputs_queue_recovery) + { + case falco_common::RECOVERY_EXIT: + falco_logger::log(LOG_ERR, "Fatal error: Output queue out of memory. Exiting ..."); + exit(EXIT_FAILURE); + case falco_common::RECOVERY_EMPTY: + m_outputs_queue_num_drops += m_queue.size() + 1; + falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event plus events in queue due to emptying the queue; continue on ..."); + m_queue.empty(); + break; + case falco_common::RECOVERY_CONTINUE: + m_outputs_queue_num_drops++; + falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event and continue on ..."); + break; + default: + falco_logger::log(LOG_ERR, "Fatal error: strategy unknown. Exiting ..."); + exit(EXIT_FAILURE); + break; + } } #else for (auto o : m_outputs) @@ -339,3 +360,8 @@ inline void falco_outputs::process_msg(falco::outputs::abstract_output* o, const falco_logger::log(LOG_DEBUG, "Outputs worker received an unknown message type\n"); } } + +uint64_t falco_outputs::get_outputs_queue_num_drops() +{ + return m_outputs_queue_num_drops.load(); +} diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 7e99453d769..45d9314166c 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -48,6 +48,8 @@ class falco_outputs bool json_include_tags_property, uint32_t timeout, bool buffered, + size_t outputs_queue_capacity, + falco_common::outputs_queue_recovery_type outputs_queue_recovery, bool time_format_iso_8601, const std::string& hostname); @@ -83,6 +85,12 @@ class falco_outputs */ void reopen_outputs(); + /*! + \brief Return the number of currently dropped events as a result of failed push attempts + into the outputs queue when using `continue` or `empty` recovery strategies. + */ + uint64_t get_outputs_queue_num_drops(); + private: std::unique_ptr m_formats; @@ -112,6 +120,8 @@ class falco_outputs falco_outputs_cbq m_queue; #endif + falco_common::outputs_queue_recovery_type m_outputs_queue_recovery; + std::atomic m_outputs_queue_num_drops; std::thread m_worker_thread; inline void push(const ctrl_msg& cmsg); inline void push_ctrl(ctrl_msg_type cmt); diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index e1215dda95e..c4d597aa571 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -106,6 +106,9 @@ stats_writer::stats_writer( if (m_initialized) { #ifndef __EMSCRIPTEN__ + // capacity and controls should not be relevant for stats outputs, adopt capacity + // for completeness, but do not implement config recovery strategies. + m_queue.set_capacity(config->m_outputs_queue_capacity); m_worker = std::thread(&stats_writer::worker, this); #endif } @@ -228,6 +231,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper( output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch; output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */ output_fields["falco.host_num_cpus"] = machine_info->num_cpus; + output_fields["falco.outputs_queue_num_drops"] = m_writer->m_outputs->get_outputs_queue_num_drops(); output_fields["evt.source"] = src; for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++)