From 536572bf11f843e6a39a7da44c44c459f078a637 Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Tue, 1 Aug 2023 05:02:45 +0000 Subject: [PATCH 01/10] fix(outputs): expose queue_capacity_outputs config for memory control Signed-off-by: Melissa Kilby --- falco.yaml | 22 ++++++++++++++++++++ userspace/falco/app/actions/init_outputs.cpp | 2 ++ userspace/falco/configuration.cpp | 5 +++++ userspace/falco/configuration.h | 2 ++ userspace/falco/configuration_aux.h | 22 ++++++++++++++++++++ userspace/falco/falco_outputs.cpp | 20 +++++++++++++++--- userspace/falco/falco_outputs.h | 3 +++ userspace/falco/stats_writer.cpp | 3 +++ 8 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 userspace/falco/configuration_aux.h diff --git a/falco.yaml b/falco.yaml index 2a9769cf325..fbedb579667 100644 --- a/falco.yaml +++ b/falco.yaml @@ -39,6 +39,7 @@ # json_include_tags_property # buffered_outputs # outputs (throttling) +# queue_capacity_outputs # Falco outputs channels # stdout_output # syslog_output @@ -304,6 +305,27 @@ outputs: # defined. rule_matching: first +# [Experimental] `queue_capacity_outputs` +# +# Falco utilizes tbb::concurrent_bounded_queue for the outputs, and this parameter +# allows you to customize the capacity. Refer to the official documentation: +# https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html. +# On a healthy system with tuned Falco rules, the queue should not fill up. +# If it does, it most likely happens if the entire event flow is too slow. This +# could indicate that the server is under heavy load. +# +# Lowering the number of items can prevent steadily increasing memory until the OOM +# killer stops the Falco process. We expose recovery actions to self-limit or self +# OOM kill earlier similar to how we expose the kernel buffer size as parameter. +# However, it will not address the root cause of the event pipe not holding up. +queue_capacity_outputs: + # number of max items in queue + items: 1000000 + # continue: 0 (default) + # exit: 1 + # empty queue then continue: 2 + recovery: 0 + ########################## # Falco outputs channels # diff --git a/userspace/falco/app/actions/init_outputs.cpp b/userspace/falco/app/actions/init_outputs.cpp index aebefe44da6..3f415140d64 100644 --- a/userspace/falco/app/actions/init_outputs.cpp +++ b/userspace/falco/app/actions/init_outputs.cpp @@ -63,6 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s) s.config->m_json_include_tags_property, s.config->m_output_timeout, s.config->m_buffered_outputs, + s.config->m_queue_capacity_outputs_items, + s.config->m_queue_capacity_outputs_recovery, s.config->m_time_format_iso_8601, hostname)); diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index a059d321cbf..d94ccf77486 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -28,6 +28,7 @@ limitations under the License. #include "falco_utils.h" #include "configuration.h" +#include "configuration_aux.h" #include "logger.h" #include "banned.h" // This raises a compilation error when certain functions are used @@ -40,6 +41,8 @@ falco_configuration::falco_configuration(): m_watch_config_files(true), m_rule_matching(falco_common::rule_matching::FIRST), m_buffered_outputs(false), + m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS), + m_queue_capacity_outputs_recovery(RECOVERY_DROP_CURRENT), m_time_format_iso_8601(false), m_output_timeout(2000), m_grpc_enabled(false), @@ -258,6 +261,8 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h } m_buffered_outputs = config.get_scalar("buffered_outputs", false); + m_queue_capacity_outputs_items = config.get_scalar("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS); + m_queue_capacity_outputs_recovery = config.get_scalar("queue_capacity_outputs.recovery", RECOVERY_DROP_CURRENT); m_time_format_iso_8601 = config.get_scalar("time_format_iso_8601", false); falco_logger::log_stderr = config.get_scalar("log_stderr", false); diff --git a/userspace/falco/configuration.h b/userspace/falco/configuration.h index 1b3bf7f1a3e..1f0579c2aaa 100644 --- a/userspace/falco/configuration.h +++ b/userspace/falco/configuration.h @@ -72,6 +72,8 @@ class falco_configuration bool m_watch_config_files; bool m_buffered_outputs; + size_t m_queue_capacity_outputs_items; + uint32_t m_queue_capacity_outputs_recovery; bool m_time_format_iso_8601; uint32_t m_output_timeout; diff --git a/userspace/falco/configuration_aux.h b/userspace/falco/configuration_aux.h new file mode 100644 index 00000000000..eb9344a6d31 --- /dev/null +++ b/userspace/falco/configuration_aux.h @@ -0,0 +1,22 @@ +/* +Copyright (C) 2023 The Falco Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#pragma once + +#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 1000000UL + +enum outputs_recovery_code { + RECOVERY_DROP_CURRENT = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ + RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ + RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ +}; diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index a8353546c69..c5513ccbacb 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -19,8 +19,8 @@ limitations under the License. #endif #include "falco_outputs.h" - #include "config_falco.h" +#include "configuration_aux.h" #include "formats.h" #include "logger.h" @@ -47,6 +47,8 @@ falco_outputs::falco_outputs( bool json_include_tags_property, uint32_t timeout, bool buffered, + size_t queue_capacity_outputs_items, + uint32_t queue_capacity_outputs_recovery, bool time_format_iso_8601, const std::string& hostname) { @@ -66,6 +68,8 @@ falco_outputs::falco_outputs( } #ifndef __EMSCRIPTEN__ m_worker_thread = std::thread(&falco_outputs::worker, this); + m_queue.set_capacity(queue_capacity_outputs_items); + m_recovery = queue_capacity_outputs_recovery; #endif } @@ -274,8 +278,18 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) #ifndef __EMSCRIPTEN__ if (!m_queue.try_push(cmsg)) { - fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n"); - exit(EXIT_FAILURE); + switch (m_recovery) + { + case RECOVERY_EXIT: + fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting ... \n"); + exit(EXIT_FAILURE); + case RECOVERY_EMPTY: + fprintf(stderr, "Output queue reached maximum capacity. Empty queue and continue ... \n"); + m_queue.empty(); + default: + fprintf(stderr, "Output queue reached maximum capacity. Continue on ... \n"); + break; + } } #else for (auto o : m_outputs) diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 7e99453d769..f8ca285076f 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -48,6 +48,8 @@ class falco_outputs bool json_include_tags_property, uint32_t timeout, bool buffered, + size_t queue_capacity_outputs_items, + uint32_t queue_capacity_outputs_recovery, bool time_format_iso_8601, const std::string& hostname); @@ -110,6 +112,7 @@ class falco_outputs #ifndef __EMSCRIPTEN__ typedef tbb::concurrent_bounded_queue falco_outputs_cbq; falco_outputs_cbq m_queue; + uint32_t m_recovery; #endif std::thread m_worker_thread; diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index e1215dda95e..dd994e32ad3 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -87,6 +87,9 @@ stats_writer::stats_writer( : m_initialized(false), m_total_samples(0) { m_config = config; + // capacity and controls should not be relevant for stats outputs, adopt capacity + // for completeness, but do not implement config recovery strategies. + m_queue.set_capacity(config->m_queue_capacity_outputs_items); if (config->m_metrics_enabled) { if (!config->m_metrics_output_file.empty()) From 63c25bd45601b5e9a50a4b702522fb5a41240080 Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Wed, 2 Aug 2023 03:22:26 +0000 Subject: [PATCH 02/10] cleanup(outputs): ensure old defaults in queue_capacity_outputs in new config Co-authored-by: Leonardo Grasso Signed-off-by: Melissa Kilby --- falco.yaml | 39 +++++++++++++++++------------ userspace/falco/configuration.cpp | 4 +-- userspace/falco/configuration_aux.h | 2 +- userspace/falco/falco_outputs.cpp | 12 ++++++--- userspace/falco/stats_writer.cpp | 5 +++- 5 files changed, 38 insertions(+), 24 deletions(-) diff --git a/falco.yaml b/falco.yaml index fbedb579667..be04e11a09f 100644 --- a/falco.yaml +++ b/falco.yaml @@ -307,24 +307,31 @@ rule_matching: first # [Experimental] `queue_capacity_outputs` # -# Falco utilizes tbb::concurrent_bounded_queue for the outputs, and this parameter -# allows you to customize the capacity. Refer to the official documentation: +# Falco utilizes tbb::concurrent_bounded_queue for handling outputs, and this parameter +# allows you to customize the queue capacity. Please refer to the official documentation: # https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html. -# On a healthy system with tuned Falco rules, the queue should not fill up. -# If it does, it most likely happens if the entire event flow is too slow. This -# could indicate that the server is under heavy load. -# -# Lowering the number of items can prevent steadily increasing memory until the OOM -# killer stops the Falco process. We expose recovery actions to self-limit or self -# OOM kill earlier similar to how we expose the kernel buffer size as parameter. -# However, it will not address the root cause of the event pipe not holding up. +# On a healthy system with optimized Falco rules, the queue should not fill up. +# If it does, it is most likely happening due to the entire event flow being too slow, +# indicating that the server is under heavy load. +# +# Lowering the number of items can prevent memory from steadily increasing until the OOM +# killer stops the Falco process. We provide recovery actions to self-limit or self-kill +# in order to handle this situation earlier, similar to how we expose the kernel buffer size +# as a parameter. +# However, it will not address the root cause of the event pipe not keeping up. +# +# `items`: the maximum number of items allowed in the queue, defaulting to 0. This means that +# the queue is unbounded. +# You can experiment with values greater or smaller than the anchor value 1000000. +# +# `recovery`: the strategy to follow when the queue becomes filled up. This also applies when +# the queue is unbounded, and all available memory on the system is consumed. +# recovery: 0 means continue. +# recovery: 1 means simply exit (default behavior). +# recovery: 2 means empty the queue and then continue. queue_capacity_outputs: - # number of max items in queue - items: 1000000 - # continue: 0 (default) - # exit: 1 - # empty queue then continue: 2 - recovery: 0 + items: 0 + recovery: 1 ########################## diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index d94ccf77486..caf945e7fbb 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -42,7 +42,7 @@ falco_configuration::falco_configuration(): m_rule_matching(falco_common::rule_matching::FIRST), m_buffered_outputs(false), m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS), - m_queue_capacity_outputs_recovery(RECOVERY_DROP_CURRENT), + m_queue_capacity_outputs_recovery(RECOVERY_EXIT), m_time_format_iso_8601(false), m_output_timeout(2000), m_grpc_enabled(false), @@ -262,7 +262,7 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h m_buffered_outputs = config.get_scalar("buffered_outputs", false); m_queue_capacity_outputs_items = config.get_scalar("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS); - m_queue_capacity_outputs_recovery = config.get_scalar("queue_capacity_outputs.recovery", RECOVERY_DROP_CURRENT); + m_queue_capacity_outputs_recovery = config.get_scalar("queue_capacity_outputs.recovery", RECOVERY_EXIT); m_time_format_iso_8601 = config.get_scalar("time_format_iso_8601", false); falco_logger::log_stderr = config.get_scalar("log_stderr", false); diff --git a/userspace/falco/configuration_aux.h b/userspace/falco/configuration_aux.h index eb9344a6d31..31504e30041 100644 --- a/userspace/falco/configuration_aux.h +++ b/userspace/falco/configuration_aux.h @@ -13,7 +13,7 @@ limitations under the License. #pragma once -#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 1000000UL +#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 0 enum outputs_recovery_code { RECOVERY_DROP_CURRENT = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index c5513ccbacb..18fd04a9fc3 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -68,7 +68,11 @@ falco_outputs::falco_outputs( } #ifndef __EMSCRIPTEN__ m_worker_thread = std::thread(&falco_outputs::worker, this); - m_queue.set_capacity(queue_capacity_outputs_items); + if (queue_capacity_outputs_items > 0) + { + m_queue.set_capacity(queue_capacity_outputs_items); + } + m_recovery = queue_capacity_outputs_recovery; #endif } @@ -281,13 +285,13 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) switch (m_recovery) { case RECOVERY_EXIT: - fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting ... \n"); + fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); exit(EXIT_FAILURE); case RECOVERY_EMPTY: - fprintf(stderr, "Output queue reached maximum capacity. Empty queue and continue ... \n"); + fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); m_queue.empty(); default: - fprintf(stderr, "Output queue reached maximum capacity. Continue on ... \n"); + fprintf(stderr, "Output queue out of memory. Continue on ... \n"); break; } } diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index dd994e32ad3..897f58d558b 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -89,7 +89,10 @@ stats_writer::stats_writer( m_config = config; // capacity and controls should not be relevant for stats outputs, adopt capacity // for completeness, but do not implement config recovery strategies. - m_queue.set_capacity(config->m_queue_capacity_outputs_items); + if (config->m_queue_capacity_outputs_items > 0) + { + m_queue.set_capacity(config->m_queue_capacity_outputs_items); + } if (config->m_metrics_enabled) { if (!config->m_metrics_output_file.empty()) From 55bd52766c8936313e0cc0eda6b1d70cec7c964e Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Thu, 3 Aug 2023 03:04:15 +0000 Subject: [PATCH 03/10] cleanup(outputs): adopt different style for outputs_queue params encodings Co-authored-by: Leonardo Grasso Signed-off-by: Melissa Kilby --- falco.yaml | 20 +++++++++--------- userspace/engine/falco_common.cpp | 21 +++++++++++++++++++ userspace/engine/falco_common.h | 10 +++++++++ userspace/falco/app/actions/init_outputs.cpp | 4 ++-- userspace/falco/configuration.cpp | 14 ++++++++----- userspace/falco/configuration.h | 4 ++-- userspace/falco/configuration_aux.h | 22 -------------------- userspace/falco/falco_outputs.cpp | 17 +++++++-------- userspace/falco/falco_outputs.h | 4 ++-- userspace/falco/stats_writer.cpp | 4 ++-- 10 files changed, 66 insertions(+), 54 deletions(-) delete mode 100644 userspace/falco/configuration_aux.h diff --git a/falco.yaml b/falco.yaml index be04e11a09f..c64e9f9e1bd 100644 --- a/falco.yaml +++ b/falco.yaml @@ -39,7 +39,8 @@ # json_include_tags_property # buffered_outputs # outputs (throttling) -# queue_capacity_outputs +# rule_matching +# outputs_queue # Falco outputs channels # stdout_output # syslog_output @@ -305,7 +306,7 @@ outputs: # defined. rule_matching: first -# [Experimental] `queue_capacity_outputs` +# [Experimental] `outputs_queue` # # Falco utilizes tbb::concurrent_bounded_queue for handling outputs, and this parameter # allows you to customize the queue capacity. Please refer to the official documentation: @@ -320,18 +321,17 @@ rule_matching: first # as a parameter. # However, it will not address the root cause of the event pipe not keeping up. # -# `items`: the maximum number of items allowed in the queue, defaulting to 0. This means that -# the queue is unbounded. +# `capacity`: the maximum number of items allowed in the queue, defaulting to 0. This means that +# the queue remains unbounded aka this setting is disabled. # You can experiment with values greater or smaller than the anchor value 1000000. # # `recovery`: the strategy to follow when the queue becomes filled up. This also applies when # the queue is unbounded, and all available memory on the system is consumed. -# recovery: 0 means continue. -# recovery: 1 means simply exit (default behavior). -# recovery: 2 means empty the queue and then continue. -queue_capacity_outputs: - items: 0 - recovery: 1 +# `exit` is default, `continue` does nothing special and `empty` empties the queue and then +# continues. +outputs_queue: + capacity: 0 + recovery: exit ########################## diff --git a/userspace/engine/falco_common.cpp b/userspace/engine/falco_common.cpp index 44b7a489ecf..92fddf12e1d 100644 --- a/userspace/engine/falco_common.cpp +++ b/userspace/engine/falco_common.cpp @@ -27,10 +27,18 @@ static std::vector priority_names = { "Debug" }; +<<<<<<< HEAD static std::vector rule_matching_names = { "first", "all" }; +======= +static std::vector outputs_recovery_names = { + "continue", + "exit", + "empty", + }; +>>>>>>> 92bd5767 (cleanup(outputs): adopt different style for outputs_queue params encodings) bool falco_common::parse_priority(std::string v, priority_type& out) { @@ -59,6 +67,19 @@ falco_common::priority_type falco_common::parse_priority(std::string v) return out; } +bool falco_common::parse_recovery(std::string v, outputs_recovery_type& out) +{ + for (size_t i = 0; i < outputs_recovery_names.size(); i++) + { + if (!strcasecmp(v.c_str(), outputs_recovery_names[i].c_str())) + { + out = (outputs_recovery_type) i; + return true; + } + } + return false; +} + bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt) { if ((size_t) v < priority_names.size()) diff --git a/userspace/engine/falco_common.h b/userspace/engine/falco_common.h index 6c63eec9d42..e0c7ebedc08 100644 --- a/userspace/engine/falco_common.h +++ b/userspace/engine/falco_common.h @@ -21,6 +21,8 @@ limitations under the License. #include #include +#define DEFAULT_OUTPUTS_QUEUE_CAPACITY 0 + // // Most falco_* classes can throw exceptions. Unless directly related // to low-level failures like inability to open file, etc, they will @@ -52,6 +54,13 @@ struct falco_exception : std::exception namespace falco_common { + + enum outputs_recovery_type { + RECOVERY_CONTINUE = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ + RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ + RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ + }; + const std::string syscall_source = sinsp_syscall_event_source_name; // Same as numbers/indices into the above vector @@ -69,6 +78,7 @@ namespace falco_common bool parse_priority(std::string v, priority_type& out); priority_type parse_priority(std::string v); + bool parse_recovery(std::string v, outputs_recovery_type& out); bool format_priority(priority_type v, std::string& out, bool shortfmt=false); std::string format_priority(priority_type v, bool shortfmt=false); diff --git a/userspace/falco/app/actions/init_outputs.cpp b/userspace/falco/app/actions/init_outputs.cpp index 3f415140d64..7fd32d52b83 100644 --- a/userspace/falco/app/actions/init_outputs.cpp +++ b/userspace/falco/app/actions/init_outputs.cpp @@ -63,8 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s) s.config->m_json_include_tags_property, s.config->m_output_timeout, s.config->m_buffered_outputs, - s.config->m_queue_capacity_outputs_items, - s.config->m_queue_capacity_outputs_recovery, + s.config->m_outputs_queue_capacity, + s.config->m_outputs_queue_recovery, s.config->m_time_format_iso_8601, hostname)); diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index caf945e7fbb..43b10e2bb57 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -28,7 +28,6 @@ limitations under the License. #include "falco_utils.h" #include "configuration.h" -#include "configuration_aux.h" #include "logger.h" #include "banned.h" // This raises a compilation error when certain functions are used @@ -41,8 +40,8 @@ falco_configuration::falco_configuration(): m_watch_config_files(true), m_rule_matching(falco_common::rule_matching::FIRST), m_buffered_outputs(false), - m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS), - m_queue_capacity_outputs_recovery(RECOVERY_EXIT), + m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY), + m_outputs_queue_recovery(falco_common::RECOVERY_EXIT), m_time_format_iso_8601(false), m_output_timeout(2000), m_grpc_enabled(false), @@ -261,8 +260,13 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h } m_buffered_outputs = config.get_scalar("buffered_outputs", false); - m_queue_capacity_outputs_items = config.get_scalar("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS); - m_queue_capacity_outputs_recovery = config.get_scalar("queue_capacity_outputs.recovery", RECOVERY_EXIT); + m_outputs_queue_capacity = config.get_scalar("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY); + std::string recovery = config.get_scalar("outputs_queue.recovery", "exit"); + if (!falco_common::parse_recovery(recovery, m_outputs_queue_recovery)) + { + throw std::logic_error("Unknown recovery \"" + recovery + "\"--must be one of exit, continue, empty"); + } + m_time_format_iso_8601 = config.get_scalar("time_format_iso_8601", false); falco_logger::log_stderr = config.get_scalar("log_stderr", false); diff --git a/userspace/falco/configuration.h b/userspace/falco/configuration.h index 1f0579c2aaa..86bf813f2b6 100644 --- a/userspace/falco/configuration.h +++ b/userspace/falco/configuration.h @@ -72,8 +72,8 @@ class falco_configuration bool m_watch_config_files; bool m_buffered_outputs; - size_t m_queue_capacity_outputs_items; - uint32_t m_queue_capacity_outputs_recovery; + size_t m_outputs_queue_capacity; + falco_common::outputs_recovery_type m_outputs_queue_recovery; bool m_time_format_iso_8601; uint32_t m_output_timeout; diff --git a/userspace/falco/configuration_aux.h b/userspace/falco/configuration_aux.h deleted file mode 100644 index 31504e30041..00000000000 --- a/userspace/falco/configuration_aux.h +++ /dev/null @@ -1,22 +0,0 @@ -/* -Copyright (C) 2023 The Falco Authors. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#pragma once - -#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 0 - -enum outputs_recovery_code { - RECOVERY_DROP_CURRENT = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ - RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ - RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ -}; diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 18fd04a9fc3..74dc67febc9 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -20,7 +20,6 @@ limitations under the License. #include "falco_outputs.h" #include "config_falco.h" -#include "configuration_aux.h" #include "formats.h" #include "logger.h" @@ -47,8 +46,8 @@ falco_outputs::falco_outputs( bool json_include_tags_property, uint32_t timeout, bool buffered, - size_t queue_capacity_outputs_items, - uint32_t queue_capacity_outputs_recovery, + size_t outputs_queue_capacity, + falco_common::outputs_recovery_type outputs_queue_recovery, bool time_format_iso_8601, const std::string& hostname) { @@ -68,12 +67,11 @@ falco_outputs::falco_outputs( } #ifndef __EMSCRIPTEN__ m_worker_thread = std::thread(&falco_outputs::worker, this); - if (queue_capacity_outputs_items > 0) + if (outputs_queue_capacity > 0) { - m_queue.set_capacity(queue_capacity_outputs_items); + m_queue.set_capacity(outputs_queue_capacity); } - - m_recovery = queue_capacity_outputs_recovery; + m_recovery = outputs_queue_recovery; #endif } @@ -284,12 +282,13 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) { switch (m_recovery) { - case RECOVERY_EXIT: + case falco_common::RECOVERY_EXIT: fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); exit(EXIT_FAILURE); - case RECOVERY_EMPTY: + case falco_common::RECOVERY_EMPTY: fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); m_queue.empty(); + break; default: fprintf(stderr, "Output queue out of memory. Continue on ... \n"); break; diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index f8ca285076f..a60807a0c75 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -48,8 +48,8 @@ class falco_outputs bool json_include_tags_property, uint32_t timeout, bool buffered, - size_t queue_capacity_outputs_items, - uint32_t queue_capacity_outputs_recovery, + size_t outputs_queue_capacity, + falco_common::outputs_recovery_type outputs_queue_recovery, bool time_format_iso_8601, const std::string& hostname); diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index 897f58d558b..91e2a131089 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -89,9 +89,9 @@ stats_writer::stats_writer( m_config = config; // capacity and controls should not be relevant for stats outputs, adopt capacity // for completeness, but do not implement config recovery strategies. - if (config->m_queue_capacity_outputs_items > 0) + if (config->m_outputs_queue_capacity > 0) { - m_queue.set_capacity(config->m_queue_capacity_outputs_items); + m_queue.set_capacity(config->m_outputs_queue_capacity); } if (config->m_metrics_enabled) { From 5fa4cce871c7beeee08afb9cace593cf3389cd77 Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Fri, 25 Aug 2023 17:43:55 +0000 Subject: [PATCH 04/10] new(metrics): add falco.outputs_queue_num_drops metrics + plus fix rebase leftovers Signed-off-by: Melissa Kilby --- userspace/engine/falco_common.cpp | 6 ++---- userspace/falco/app/actions/process_events.cpp | 4 ++-- userspace/falco/falco_outputs.cpp | 8 ++++++++ userspace/falco/falco_outputs.h | 7 +++++++ userspace/falco/stats_writer.cpp | 8 +++++--- userspace/falco/stats_writer.h | 4 ++-- 6 files changed, 26 insertions(+), 11 deletions(-) diff --git a/userspace/engine/falco_common.cpp b/userspace/engine/falco_common.cpp index 92fddf12e1d..d94306aac1a 100644 --- a/userspace/engine/falco_common.cpp +++ b/userspace/engine/falco_common.cpp @@ -27,18 +27,16 @@ static std::vector priority_names = { "Debug" }; -<<<<<<< HEAD static std::vector rule_matching_names = { "first", "all" }; -======= + static std::vector outputs_recovery_names = { "continue", "exit", "empty", - }; ->>>>>>> 92bd5767 (cleanup(outputs): adopt different style for outputs_queue params encodings) +}; bool falco_common::parse_priority(std::string v, priority_type& out) { diff --git a/userspace/falco/app/actions/process_events.cpp b/userspace/falco/app/actions/process_events.cpp index b4a05ef1520..0b21ee08bef 100644 --- a/userspace/falco/app/actions/process_events.cpp +++ b/userspace/falco/app/actions/process_events.cpp @@ -281,7 +281,7 @@ static falco::app::run_result do_inspect( } // for capture mode, the source name can change at every event - stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], num_evts); + stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], s.outputs, num_evts); } else { @@ -300,7 +300,7 @@ static falco::app::run_result do_inspect( } // for live mode, the source name is constant - stats_collector.collect(inspector, source, num_evts); + stats_collector.collect(inspector, source, s.outputs, num_evts); } // Reset the timeouts counter, Falco successfully got an event to process diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 74dc67febc9..ddfa2e8b4bf 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -72,6 +72,7 @@ falco_outputs::falco_outputs( m_queue.set_capacity(outputs_queue_capacity); } m_recovery = outputs_queue_recovery; + m_outputs_queue_num_drops = 0UL; #endif } @@ -286,10 +287,12 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); exit(EXIT_FAILURE); case falco_common::RECOVERY_EMPTY: + m_outputs_queue_num_drops += m_queue.size(); fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); m_queue.empty(); break; default: + m_outputs_queue_num_drops++; fprintf(stderr, "Output queue out of memory. Continue on ... \n"); break; } @@ -356,3 +359,8 @@ inline void falco_outputs::process_msg(falco::outputs::abstract_output* o, const falco_logger::log(LOG_DEBUG, "Outputs worker received an unknown message type\n"); } } + +uint64_t falco_outputs::get_outputs_queue_num_drops() +{ + return m_outputs_queue_num_drops; +} diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index a60807a0c75..cf6efc3362b 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -85,6 +85,12 @@ class falco_outputs */ void reopen_outputs(); + /*! + \brief Return the number of currently dropped events as a result of failed push attempts + into the outputs queue when using `continue` or `empty` recovery strategies. + */ + uint64_t get_outputs_queue_num_drops(); + private: std::unique_ptr m_formats; @@ -113,6 +119,7 @@ class falco_outputs typedef tbb::concurrent_bounded_queue falco_outputs_cbq; falco_outputs_cbq m_queue; uint32_t m_recovery; + uint64_t m_outputs_queue_num_drops; #endif std::thread m_worker_thread; diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index 91e2a131089..50aff48558c 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -217,7 +217,7 @@ stats_writer::collector::collector(const std::shared_ptr& writer) void stats_writer::collector::get_metrics_output_fields_wrapper( nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, - const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec) + const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec) { static const char* all_driver_engines[] = { BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE, @@ -234,6 +234,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper( output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch; output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */ output_fields["falco.host_num_cpus"] = machine_info->num_cpus; + output_fields["falco.outputs_queue_num_drops"] = outputs_queue_num_drops; output_fields["evt.source"] = src; for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++) @@ -418,7 +419,7 @@ void stats_writer::collector::get_metrics_output_fields_additional( #endif } -void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, uint64_t num_evts) +void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, const std::shared_ptr& outputs, uint64_t num_evts) { if (m_writer->has_output()) { @@ -439,7 +440,8 @@ void stats_writer::collector::collect(const std::shared_ptr& inspector, c /* Get respective metrics output_fields. */ nlohmann::json output_fields; - get_metrics_output_fields_wrapper(output_fields, inspector, now, src, num_evts, stats_snapshot_time_delta_sec); + uint64_t outputs_queue_num_drops = outputs->get_outputs_queue_num_drops(); + get_metrics_output_fields_wrapper(output_fields, inspector, now, src, outputs_queue_num_drops, num_evts, stats_snapshot_time_delta_sec); get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src); /* Send message in the queue */ diff --git a/userspace/falco/stats_writer.h b/userspace/falco/stats_writer.h index 42a183e7825..4c68d243454 100644 --- a/userspace/falco/stats_writer.h +++ b/userspace/falco/stats_writer.h @@ -60,13 +60,13 @@ class stats_writer \brief Collects one stats sample from an inspector and for the given event source name */ - void collect(const std::shared_ptr& inspector, const std::string& src, uint64_t num_evts); + void collect(const std::shared_ptr& inspector, const std::string& src, const std::shared_ptr& outputs, uint64_t num_evts); private: /*! \brief Collect snapshot metrics wrapper fields as internal rule formatted output fields. */ - void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec); + void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec); /*! \brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields. From c76d9b04d0fdf6bf1a21cc97000fa704b25ff841 Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Fri, 25 Aug 2023 18:41:55 +0000 Subject: [PATCH 05/10] cleanup(userspace/falco): always set queue capacity and use largest long as default for unbounded Co-authored-by: Andrea Terzolo Signed-off-by: Melissa Kilby --- userspace/engine/falco_common.h | 6 +++++- userspace/falco/configuration.cpp | 5 +++++ userspace/falco/falco_outputs.cpp | 5 +---- userspace/falco/stats_writer.cpp | 5 +---- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/userspace/engine/falco_common.h b/userspace/engine/falco_common.h index e0c7ebedc08..6ddefad2ca9 100644 --- a/userspace/engine/falco_common.h +++ b/userspace/engine/falco_common.h @@ -21,7 +21,11 @@ limitations under the License. #include #include -#define DEFAULT_OUTPUTS_QUEUE_CAPACITY 0 +// +// equivalent to an "unbounded queue" in TBB terms or largest long value +// https://github.com/oneapi-src/oneTBB/blob/b2474bfc636937052d05daf8b3f4d6b76e20273a/include/oneapi/tbb/concurrent_queue.h#L554 +// +#define DEFAULT_OUTPUTS_QUEUE_CAPACITY std::ptrdiff_t(~size_t(0) / 2) // // Most falco_* classes can throw exceptions. Unless directly related diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index 43b10e2bb57..0d90d36f257 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -261,6 +261,11 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h m_buffered_outputs = config.get_scalar("buffered_outputs", false); m_outputs_queue_capacity = config.get_scalar("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY); + // We use 0 in falco.yaml to indicate an unbounded queue; equivalent to the largest long value + if (m_outputs_queue_capacity == 0) + { + m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY; + } std::string recovery = config.get_scalar("outputs_queue.recovery", "exit"); if (!falco_common::parse_recovery(recovery, m_outputs_queue_recovery)) { diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index ddfa2e8b4bf..493fb0f930f 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -67,10 +67,7 @@ falco_outputs::falco_outputs( } #ifndef __EMSCRIPTEN__ m_worker_thread = std::thread(&falco_outputs::worker, this); - if (outputs_queue_capacity > 0) - { - m_queue.set_capacity(outputs_queue_capacity); - } + m_queue.set_capacity(outputs_queue_capacity); m_recovery = outputs_queue_recovery; m_outputs_queue_num_drops = 0UL; #endif diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index 50aff48558c..5aee9411f02 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -89,10 +89,7 @@ stats_writer::stats_writer( m_config = config; // capacity and controls should not be relevant for stats outputs, adopt capacity // for completeness, but do not implement config recovery strategies. - if (config->m_outputs_queue_capacity > 0) - { - m_queue.set_capacity(config->m_outputs_queue_capacity); - } + m_queue.set_capacity(config->m_outputs_queue_capacity); if (config->m_metrics_enabled) { if (!config->m_metrics_output_file.empty()) From 34ae690641b904d8963ef2e724c71b311f31f93a Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Fri, 25 Aug 2023 20:06:37 +0000 Subject: [PATCH 06/10] cleanup: apply reviewers suggestions Co-authored-by: Andrea Terzolo Signed-off-by: Melissa Kilby --- falco.yaml | 19 ++++++++++--------- userspace/engine/falco_common.cpp | 10 +++++----- userspace/engine/falco_common.h | 4 ++-- .../falco/app/actions/process_events.cpp | 4 ++-- userspace/falco/configuration.cpp | 2 +- userspace/falco/configuration.h | 2 +- userspace/falco/falco_outputs.cpp | 2 +- userspace/falco/falco_outputs.h | 4 ++-- userspace/falco/stats_writer.cpp | 9 ++++----- userspace/falco/stats_writer.h | 4 ++-- 10 files changed, 30 insertions(+), 30 deletions(-) diff --git a/falco.yaml b/falco.yaml index c64e9f9e1bd..c235c3cbeba 100644 --- a/falco.yaml +++ b/falco.yaml @@ -318,17 +318,18 @@ rule_matching: first # Lowering the number of items can prevent memory from steadily increasing until the OOM # killer stops the Falco process. We provide recovery actions to self-limit or self-kill # in order to handle this situation earlier, similar to how we expose the kernel buffer size -# as a parameter. -# However, it will not address the root cause of the event pipe not keeping up. +# as a parameter. However, it will not address the root cause of the event pipe not keeping up. # -# `capacity`: the maximum number of items allowed in the queue, defaulting to 0. This means that -# the queue remains unbounded aka this setting is disabled. -# You can experiment with values greater or smaller than the anchor value 1000000. +# `capacity`: the maximum number of items allowed in the queue is determined by this value. +# Setting the value to 0 (which is the default) is equivalent to keeping the queue unbounded. +# In other words, when this configuration is set to 0, the number of allowed items is effectively +# set to the largest possible long value, disabling this setting. # -# `recovery`: the strategy to follow when the queue becomes filled up. This also applies when -# the queue is unbounded, and all available memory on the system is consumed. -# `exit` is default, `continue` does nothing special and `empty` empties the queue and then -# continues. +# `recovery`: strategy to follow when the queue becomes filled up. It applies only when the +# queue is bounded and there is still available system memory. In the case of an unbounded +# queue, if the available memory on the system is consumed, the Falco process would be +# OOM killed. The value `exit` is the default, `continue` does nothing special and `empty` +# empties the queue and then continues. outputs_queue: capacity: 0 recovery: exit diff --git a/userspace/engine/falco_common.cpp b/userspace/engine/falco_common.cpp index d94306aac1a..9cf27d636b3 100644 --- a/userspace/engine/falco_common.cpp +++ b/userspace/engine/falco_common.cpp @@ -32,7 +32,7 @@ static std::vector rule_matching_names = { "all" }; -static std::vector outputs_recovery_names = { +static std::vector outputs_queue_recovery_names = { "continue", "exit", "empty", @@ -65,13 +65,13 @@ falco_common::priority_type falco_common::parse_priority(std::string v) return out; } -bool falco_common::parse_recovery(std::string v, outputs_recovery_type& out) +bool falco_common::parse_queue_recovery(std::string v, outputs_queue_recovery_type& out) { - for (size_t i = 0; i < outputs_recovery_names.size(); i++) + for (size_t i = 0; i < outputs_queue_recovery_names.size(); i++) { - if (!strcasecmp(v.c_str(), outputs_recovery_names[i].c_str())) + if (!strcasecmp(v.c_str(), outputs_queue_recovery_names[i].c_str())) { - out = (outputs_recovery_type) i; + out = (outputs_queue_recovery_type) i; return true; } } diff --git a/userspace/engine/falco_common.h b/userspace/engine/falco_common.h index 6ddefad2ca9..63af03f0c2b 100644 --- a/userspace/engine/falco_common.h +++ b/userspace/engine/falco_common.h @@ -59,7 +59,7 @@ struct falco_exception : std::exception namespace falco_common { - enum outputs_recovery_type { + enum outputs_queue_recovery_type { RECOVERY_CONTINUE = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ @@ -82,7 +82,7 @@ namespace falco_common bool parse_priority(std::string v, priority_type& out); priority_type parse_priority(std::string v); - bool parse_recovery(std::string v, outputs_recovery_type& out); + bool parse_queue_recovery(std::string v, outputs_queue_recovery_type& out); bool format_priority(priority_type v, std::string& out, bool shortfmt=false); std::string format_priority(priority_type v, bool shortfmt=false); diff --git a/userspace/falco/app/actions/process_events.cpp b/userspace/falco/app/actions/process_events.cpp index 0b21ee08bef..b4a05ef1520 100644 --- a/userspace/falco/app/actions/process_events.cpp +++ b/userspace/falco/app/actions/process_events.cpp @@ -281,7 +281,7 @@ static falco::app::run_result do_inspect( } // for capture mode, the source name can change at every event - stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], s.outputs, num_evts); + stats_collector.collect(inspector, inspector->event_sources()[source_engine_idx], num_evts); } else { @@ -300,7 +300,7 @@ static falco::app::run_result do_inspect( } // for live mode, the source name is constant - stats_collector.collect(inspector, source, s.outputs, num_evts); + stats_collector.collect(inspector, source, num_evts); } // Reset the timeouts counter, Falco successfully got an event to process diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index 0d90d36f257..ea8530e3471 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -267,7 +267,7 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY; } std::string recovery = config.get_scalar("outputs_queue.recovery", "exit"); - if (!falco_common::parse_recovery(recovery, m_outputs_queue_recovery)) + if (!falco_common::parse_queue_recovery(recovery, m_outputs_queue_recovery)) { throw std::logic_error("Unknown recovery \"" + recovery + "\"--must be one of exit, continue, empty"); } diff --git a/userspace/falco/configuration.h b/userspace/falco/configuration.h index 86bf813f2b6..c1a595d0a96 100644 --- a/userspace/falco/configuration.h +++ b/userspace/falco/configuration.h @@ -73,7 +73,7 @@ class falco_configuration bool m_watch_config_files; bool m_buffered_outputs; size_t m_outputs_queue_capacity; - falco_common::outputs_recovery_type m_outputs_queue_recovery; + falco_common::outputs_queue_recovery_type m_outputs_queue_recovery; bool m_time_format_iso_8601; uint32_t m_output_timeout; diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 493fb0f930f..d2027fa0394 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -47,7 +47,7 @@ falco_outputs::falco_outputs( uint32_t timeout, bool buffered, size_t outputs_queue_capacity, - falco_common::outputs_recovery_type outputs_queue_recovery, + falco_common::outputs_queue_recovery_type outputs_queue_recovery, bool time_format_iso_8601, const std::string& hostname) { diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index cf6efc3362b..3fbf216954d 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -49,7 +49,7 @@ class falco_outputs uint32_t timeout, bool buffered, size_t outputs_queue_capacity, - falco_common::outputs_recovery_type outputs_queue_recovery, + falco_common::outputs_queue_recovery_type outputs_queue_recovery, bool time_format_iso_8601, const std::string& hostname); @@ -118,7 +118,7 @@ class falco_outputs #ifndef __EMSCRIPTEN__ typedef tbb::concurrent_bounded_queue falco_outputs_cbq; falco_outputs_cbq m_queue; - uint32_t m_recovery; + falco_common::outputs_queue_recovery_type m_recovery; uint64_t m_outputs_queue_num_drops; #endif diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index 5aee9411f02..a7de4563c42 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -214,7 +214,7 @@ stats_writer::collector::collector(const std::shared_ptr& writer) void stats_writer::collector::get_metrics_output_fields_wrapper( nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, - const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec) + const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec) { static const char* all_driver_engines[] = { BPF_ENGINE, KMOD_ENGINE, MODERN_BPF_ENGINE, @@ -231,7 +231,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper( output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch; output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */ output_fields["falco.host_num_cpus"] = machine_info->num_cpus; - output_fields["falco.outputs_queue_num_drops"] = outputs_queue_num_drops; + output_fields["falco.outputs_queue_num_drops"] = m_writer->m_outputs->get_outputs_queue_num_drops(); output_fields["evt.source"] = src; for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++) @@ -416,7 +416,7 @@ void stats_writer::collector::get_metrics_output_fields_additional( #endif } -void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, const std::shared_ptr& outputs, uint64_t num_evts) +void stats_writer::collector::collect(const std::shared_ptr& inspector, const std::string &src, uint64_t num_evts) { if (m_writer->has_output()) { @@ -437,8 +437,7 @@ void stats_writer::collector::collect(const std::shared_ptr& inspector, c /* Get respective metrics output_fields. */ nlohmann::json output_fields; - uint64_t outputs_queue_num_drops = outputs->get_outputs_queue_num_drops(); - get_metrics_output_fields_wrapper(output_fields, inspector, now, src, outputs_queue_num_drops, num_evts, stats_snapshot_time_delta_sec); + get_metrics_output_fields_wrapper(output_fields, inspector, now, src, num_evts, stats_snapshot_time_delta_sec); get_metrics_output_fields_additional(output_fields, inspector, stats_snapshot_time_delta_sec, src); /* Send message in the queue */ diff --git a/userspace/falco/stats_writer.h b/userspace/falco/stats_writer.h index 4c68d243454..42a183e7825 100644 --- a/userspace/falco/stats_writer.h +++ b/userspace/falco/stats_writer.h @@ -60,13 +60,13 @@ class stats_writer \brief Collects one stats sample from an inspector and for the given event source name */ - void collect(const std::shared_ptr& inspector, const std::string& src, const std::shared_ptr& outputs, uint64_t num_evts); + void collect(const std::shared_ptr& inspector, const std::string& src, uint64_t num_evts); private: /*! \brief Collect snapshot metrics wrapper fields as internal rule formatted output fields. */ - void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t outputs_queue_num_drops, uint64_t num_evts, double stats_snapshot_time_delta_sec); + void get_metrics_output_fields_wrapper(nlohmann::json& output_fields, const std::shared_ptr& inspector, uint64_t now, const std::string& src, uint64_t num_evts, double stats_snapshot_time_delta_sec); /*! \brief Collect snapshot metrics syscalls related metrics as internal rule formatted output fields. From f5db79c470485f5060fff130fe7dea84682eb9a7 Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Fri, 25 Aug 2023 21:43:05 +0000 Subject: [PATCH 07/10] cleanup: apply more reviewers suggestions Co-authored-by: Andrea Terzolo Co-authored-by: Leonardo Grasso Signed-off-by: Melissa Kilby --- userspace/engine/falco_common.h | 6 +++--- userspace/falco/falco_outputs.cpp | 20 ++++++++++++-------- userspace/falco/falco_outputs.h | 4 ++-- userspace/falco/stats_writer.cpp | 6 +++--- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/userspace/engine/falco_common.h b/userspace/engine/falco_common.h index 63af03f0c2b..b62c24ff1c3 100644 --- a/userspace/engine/falco_common.h +++ b/userspace/engine/falco_common.h @@ -60,9 +60,9 @@ namespace falco_common { enum outputs_queue_recovery_type { - RECOVERY_CONTINUE = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ - RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ - RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ + RECOVERY_CONTINUE = 0, /* outputs_queue_capacity recovery strategy of continuing on. */ + RECOVERY_EXIT = 1, /* outputs_queue_capacity recovery strategy of exiting, self OOM kill. */ + RECOVERY_EMPTY = 2, /* outputs_queue_capacity recovery strategy of emptying queue then continuing. */ }; const std::string syscall_source = sinsp_syscall_event_source_name; diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index d2027fa0394..234f5ce26ea 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -65,11 +65,11 @@ falco_outputs::falco_outputs( { add_output(output); } + m_outputs_queue_num_drops = 0UL; + m_outputs_queue_recovery = outputs_queue_recovery; #ifndef __EMSCRIPTEN__ - m_worker_thread = std::thread(&falco_outputs::worker, this); m_queue.set_capacity(outputs_queue_capacity); - m_recovery = outputs_queue_recovery; - m_outputs_queue_num_drops = 0UL; + m_worker_thread = std::thread(&falco_outputs::worker, this); #endif } @@ -278,19 +278,23 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) #ifndef __EMSCRIPTEN__ if (!m_queue.try_push(cmsg)) { - switch (m_recovery) + switch (m_outputs_queue_recovery) { case falco_common::RECOVERY_EXIT: - fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n"); + falco_logger::log(LOG_ERR, "Fatal error: Output queue out of memory. Exiting ..."); exit(EXIT_FAILURE); case falco_common::RECOVERY_EMPTY: m_outputs_queue_num_drops += m_queue.size(); - fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n"); + falco_logger::log(LOG_ERR, "Output queue out of memory. Dropping events in queue due to emptying the queue and continue on ..."); m_queue.empty(); break; - default: + case falco_common::RECOVERY_CONTINUE: m_outputs_queue_num_drops++; - fprintf(stderr, "Output queue out of memory. Continue on ... \n"); + falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event and continue on ..."); + break; + default: + falco_logger::log(LOG_ERR, "Fatal error: strategy unknown. Exiting ..."); + exit(EXIT_FAILURE); break; } } diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 3fbf216954d..5549208502c 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -118,10 +118,10 @@ class falco_outputs #ifndef __EMSCRIPTEN__ typedef tbb::concurrent_bounded_queue falco_outputs_cbq; falco_outputs_cbq m_queue; - falco_common::outputs_queue_recovery_type m_recovery; - uint64_t m_outputs_queue_num_drops; #endif + falco_common::outputs_queue_recovery_type m_outputs_queue_recovery; + uint64_t m_outputs_queue_num_drops; std::thread m_worker_thread; inline void push(const ctrl_msg& cmsg); inline void push_ctrl(ctrl_msg_type cmt); diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index a7de4563c42..c4d597aa571 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -87,9 +87,6 @@ stats_writer::stats_writer( : m_initialized(false), m_total_samples(0) { m_config = config; - // capacity and controls should not be relevant for stats outputs, adopt capacity - // for completeness, but do not implement config recovery strategies. - m_queue.set_capacity(config->m_outputs_queue_capacity); if (config->m_metrics_enabled) { if (!config->m_metrics_output_file.empty()) @@ -109,6 +106,9 @@ stats_writer::stats_writer( if (m_initialized) { #ifndef __EMSCRIPTEN__ + // capacity and controls should not be relevant for stats outputs, adopt capacity + // for completeness, but do not implement config recovery strategies. + m_queue.set_capacity(config->m_outputs_queue_capacity); m_worker = std::thread(&stats_writer::worker, this); #endif } From 4dedc74e38b77146eb9a59bc8441ba5a5733dc69 Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Mon, 28 Aug 2023 03:57:18 +0000 Subject: [PATCH 08/10] cleanup(config): rename default outputs queue macro Signed-off-by: Melissa Kilby --- userspace/engine/falco_common.h | 2 +- userspace/falco/configuration.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/userspace/engine/falco_common.h b/userspace/engine/falco_common.h index b62c24ff1c3..acc1030eb60 100644 --- a/userspace/engine/falco_common.h +++ b/userspace/engine/falco_common.h @@ -25,7 +25,7 @@ limitations under the License. // equivalent to an "unbounded queue" in TBB terms or largest long value // https://github.com/oneapi-src/oneTBB/blob/b2474bfc636937052d05daf8b3f4d6b76e20273a/include/oneapi/tbb/concurrent_queue.h#L554 // -#define DEFAULT_OUTPUTS_QUEUE_CAPACITY std::ptrdiff_t(~size_t(0) / 2) +#define DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE std::ptrdiff_t(~size_t(0) / 2) // // Most falco_* classes can throw exceptions. Unless directly related diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index ea8530e3471..1f09e582b26 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -40,7 +40,7 @@ falco_configuration::falco_configuration(): m_watch_config_files(true), m_rule_matching(falco_common::rule_matching::FIRST), m_buffered_outputs(false), - m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY), + m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE), m_outputs_queue_recovery(falco_common::RECOVERY_EXIT), m_time_format_iso_8601(false), m_output_timeout(2000), @@ -260,11 +260,11 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h } m_buffered_outputs = config.get_scalar("buffered_outputs", false); - m_outputs_queue_capacity = config.get_scalar("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY); + m_outputs_queue_capacity = config.get_scalar("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE); // We use 0 in falco.yaml to indicate an unbounded queue; equivalent to the largest long value if (m_outputs_queue_capacity == 0) { - m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY; + m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE; } std::string recovery = config.get_scalar("outputs_queue.recovery", "exit"); if (!falco_common::parse_queue_recovery(recovery, m_outputs_queue_recovery)) From 78eaf0feff1655909213068ead2d90b35abf112b Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Tue, 5 Sep 2023 20:58:36 +0000 Subject: [PATCH 09/10] fix(userspace/falco): change outputs_queue_num_drops to atomic Co-authored-by: Jason Dellaluce Signed-off-by: Melissa Kilby --- userspace/falco/falco_outputs.cpp | 4 ++-- userspace/falco/falco_outputs.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 234f5ce26ea..68300ebd860 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -65,7 +65,7 @@ falco_outputs::falco_outputs( { add_output(output); } - m_outputs_queue_num_drops = 0UL; + m_outputs_queue_num_drops = {0}; m_outputs_queue_recovery = outputs_queue_recovery; #ifndef __EMSCRIPTEN__ m_queue.set_capacity(outputs_queue_capacity); @@ -363,5 +363,5 @@ inline void falco_outputs::process_msg(falco::outputs::abstract_output* o, const uint64_t falco_outputs::get_outputs_queue_num_drops() { - return m_outputs_queue_num_drops; + return m_outputs_queue_num_drops.load(); } diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index 5549208502c..45d9314166c 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -121,7 +121,7 @@ class falco_outputs #endif falco_common::outputs_queue_recovery_type m_outputs_queue_recovery; - uint64_t m_outputs_queue_num_drops; + std::atomic m_outputs_queue_num_drops; std::thread m_worker_thread; inline void push(const ctrl_msg& cmsg); inline void push_ctrl(ctrl_msg_type cmt); From c9b5f88ee75fd86ae4bdfeeff09e0128fae4206a Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Tue, 5 Sep 2023 21:06:03 +0000 Subject: [PATCH 10/10] cleanup(userspace/falco): adjust outputs_queue_num_drops counter for recovery 'empty' Signed-off-by: Melissa Kilby --- userspace/falco/falco_outputs.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index 68300ebd860..9425e5ad2bb 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -284,8 +284,8 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) falco_logger::log(LOG_ERR, "Fatal error: Output queue out of memory. Exiting ..."); exit(EXIT_FAILURE); case falco_common::RECOVERY_EMPTY: - m_outputs_queue_num_drops += m_queue.size(); - falco_logger::log(LOG_ERR, "Output queue out of memory. Dropping events in queue due to emptying the queue and continue on ..."); + m_outputs_queue_num_drops += m_queue.size() + 1; + falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event plus events in queue due to emptying the queue; continue on ..."); m_queue.empty(); break; case falco_common::RECOVERY_CONTINUE: