Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(outputs): expose outputs_queue config for memory control #2711

Merged
merged 10 commits into from
Sep 7, 2023
Merged
30 changes: 30 additions & 0 deletions falco.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
# json_include_tags_property
# buffered_outputs
# outputs (throttling)
# rule_matching
# outputs_queue
# Falco outputs channels
# stdout_output
# syslog_output
Expand Down Expand Up @@ -304,6 +306,34 @@ outputs:
# defined.
rule_matching: first

# [Experimental] `outputs_queue`
#
# Falco utilizes tbb::concurrent_bounded_queue for handling outputs, and this parameter
# allows you to customize the queue capacity. Please refer to the official documentation:
# https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html.
# On a healthy system with optimized Falco rules, the queue should not fill up.
# If it does, it is most likely happening due to the entire event flow being too slow,
# indicating that the server is under heavy load.
#
# Lowering the number of items can prevent memory from steadily increasing until the OOM
# killer stops the Falco process. We provide recovery actions to self-limit or self-kill
# in order to handle this situation earlier, similar to how we expose the kernel buffer size
# as a parameter. However, it will not address the root cause of the event pipe not keeping up.
#
# `capacity`: the maximum number of items allowed in the queue is determined by this value.
# Setting the value to 0 (which is the default) is equivalent to keeping the queue unbounded.
# In other words, when this configuration is set to 0, the number of allowed items is effectively
# set to the largest possible long value, disabling this setting.
#
# `recovery`: strategy to follow when the queue becomes filled up. It applies only when the
# queue is bounded and there is still available system memory. In the case of an unbounded
# queue, if the available memory on the system is consumed, the Falco process would be
# OOM killed. The value `exit` is the default, `continue` does nothing special and `empty`
# empties the queue and then continues.
outputs_queue:
capacity: 0
incertum marked this conversation as resolved.
Show resolved Hide resolved
recovery: exit


##########################
# Falco outputs channels #
Expand Down
19 changes: 19 additions & 0 deletions userspace/engine/falco_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ static std::vector<std::string> rule_matching_names = {
"all"
};

static std::vector<std::string> outputs_queue_recovery_names = {
"continue",
"exit",
"empty",
};

bool falco_common::parse_priority(std::string v, priority_type& out)
{
for (size_t i = 0; i < priority_names.size(); i++)
Expand Down Expand Up @@ -59,6 +65,19 @@ falco_common::priority_type falco_common::parse_priority(std::string v)
return out;
}

bool falco_common::parse_queue_recovery(std::string v, outputs_queue_recovery_type& out)
{
for (size_t i = 0; i < outputs_queue_recovery_names.size(); i++)
{
if (!strcasecmp(v.c_str(), outputs_queue_recovery_names[i].c_str()))
{
out = (outputs_queue_recovery_type) i;
return true;
}
}
return false;
}

bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt)
{
if ((size_t) v < priority_names.size())
Expand Down
14 changes: 14 additions & 0 deletions userspace/engine/falco_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ limitations under the License.
#include <mutex>
#include <sinsp.h>

//
// equivalent to an "unbounded queue" in TBB terms or largest long value
// https://github.com/oneapi-src/oneTBB/blob/b2474bfc636937052d05daf8b3f4d6b76e20273a/include/oneapi/tbb/concurrent_queue.h#L554
//
#define DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE std::ptrdiff_t(~size_t(0) / 2)

//
// Most falco_* classes can throw exceptions. Unless directly related
// to low-level failures like inability to open file, etc, they will
Expand Down Expand Up @@ -52,6 +58,13 @@ struct falco_exception : std::exception

namespace falco_common
{

enum outputs_queue_recovery_type {
RECOVERY_CONTINUE = 0, /* outputs_queue_capacity recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* outputs_queue_capacity recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* outputs_queue_capacity recovery strategy of emptying queue then continuing. */
};

const std::string syscall_source = sinsp_syscall_event_source_name;

// Same as numbers/indices into the above vector
Expand All @@ -69,6 +82,7 @@ namespace falco_common

bool parse_priority(std::string v, priority_type& out);
priority_type parse_priority(std::string v);
bool parse_queue_recovery(std::string v, outputs_queue_recovery_type& out);
bool format_priority(priority_type v, std::string& out, bool shortfmt=false);
std::string format_priority(priority_type v, bool shortfmt=false);

Expand Down
2 changes: 2 additions & 0 deletions userspace/falco/app/actions/init_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s)
s.config->m_json_include_tags_property,
s.config->m_output_timeout,
s.config->m_buffered_outputs,
s.config->m_outputs_queue_capacity,
incertum marked this conversation as resolved.
Show resolved Hide resolved
s.config->m_outputs_queue_recovery,
s.config->m_time_format_iso_8601,
hostname));

Expand Down
14 changes: 14 additions & 0 deletions userspace/falco/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ falco_configuration::falco_configuration():
m_watch_config_files(true),
m_rule_matching(falco_common::rule_matching::FIRST),
m_buffered_outputs(false),
m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE),
m_outputs_queue_recovery(falco_common::RECOVERY_EXIT),
m_time_format_iso_8601(false),
m_output_timeout(2000),
m_grpc_enabled(false),
Expand Down Expand Up @@ -258,6 +260,18 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h
}

m_buffered_outputs = config.get_scalar<bool>("buffered_outputs", false);
m_outputs_queue_capacity = config.get_scalar<size_t>("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use an int64_t instead of size_t? in this way, the DEFAULT_OUTPUTS_QUEUE_CAPACITY could be simply be -1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, at the same time always preferring to stay close to the framework, here TBB, and they use size_t. Let me know if its a [nit] or if we significantly gain from this change, then I'll do it.

// We use 0 in falco.yaml to indicate an unbounded queue; equivalent to the largest long value
if (m_outputs_queue_capacity == 0)
{
m_outputs_queue_capacity = DEFAULT_OUTPUTS_QUEUE_CAPACITY_UNBOUNDED_MAX_LONG_VALUE;
}
std::string recovery = config.get_scalar<std::string>("outputs_queue.recovery", "exit");
if (!falco_common::parse_queue_recovery(recovery, m_outputs_queue_recovery))
{
throw std::logic_error("Unknown recovery \"" + recovery + "\"--must be one of exit, continue, empty");
}

m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false);

falco_logger::log_stderr = config.get_scalar<bool>("log_stderr", false);
Expand Down
2 changes: 2 additions & 0 deletions userspace/falco/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class falco_configuration

bool m_watch_config_files;
bool m_buffered_outputs;
size_t m_outputs_queue_capacity;
falco_common::outputs_queue_recovery_type m_outputs_queue_recovery;
bool m_time_format_iso_8601;
uint32_t m_output_timeout;

Expand Down
32 changes: 29 additions & 3 deletions userspace/falco/falco_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ limitations under the License.
#endif

#include "falco_outputs.h"

#include "config_falco.h"

#include "formats.h"
Expand Down Expand Up @@ -47,6 +46,8 @@ falco_outputs::falco_outputs(
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t outputs_queue_capacity,
falco_common::outputs_queue_recovery_type outputs_queue_recovery,
bool time_format_iso_8601,
const std::string& hostname)
{
Expand All @@ -64,7 +65,10 @@ falco_outputs::falco_outputs(
{
add_output(output);
}
m_outputs_queue_num_drops = {0};
m_outputs_queue_recovery = outputs_queue_recovery;
#ifndef __EMSCRIPTEN__
m_queue.set_capacity(outputs_queue_capacity);
m_worker_thread = std::thread(&falco_outputs::worker, this);
#endif
}
Expand Down Expand Up @@ -274,8 +278,25 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
#ifndef __EMSCRIPTEN__
if (!m_queue.try_push(cmsg))
{
fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n");
exit(EXIT_FAILURE);
switch (m_outputs_queue_recovery)
{
case falco_common::RECOVERY_EXIT:
falco_logger::log(LOG_ERR, "Fatal error: Output queue out of memory. Exiting ...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throwing an exception here? It would cause one open event source thread to be stopped (and in cascade, all the others open in parallel), and Falco to terminate gracefully.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good will do!

exit(EXIT_FAILURE);
case falco_common::RECOVERY_EMPTY:
m_outputs_queue_num_drops += m_queue.size() + 1;
falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event plus events in queue due to emptying the queue; continue on ...");
m_queue.empty();
break;
case falco_common::RECOVERY_CONTINUE:
m_outputs_queue_num_drops++;
falco_logger::log(LOG_ERR, "Output queue out of memory. Drop event and continue on ...");
break;
default:
falco_logger::log(LOG_ERR, "Fatal error: strategy unknown. Exiting ...");
exit(EXIT_FAILURE);
break;
}
}
#else
for (auto o : m_outputs)
Expand Down Expand Up @@ -339,3 +360,8 @@ inline void falco_outputs::process_msg(falco::outputs::abstract_output* o, const
falco_logger::log(LOG_DEBUG, "Outputs worker received an unknown message type\n");
}
}

uint64_t falco_outputs::get_outputs_queue_num_drops()
{
return m_outputs_queue_num_drops.load();
}
10 changes: 10 additions & 0 deletions userspace/falco/falco_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class falco_outputs
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t outputs_queue_capacity,
falco_common::outputs_queue_recovery_type outputs_queue_recovery,
bool time_format_iso_8601,
const std::string& hostname);

Expand Down Expand Up @@ -83,6 +85,12 @@ class falco_outputs
*/
void reopen_outputs();

/*!
\brief Return the number of currently dropped events as a result of failed push attempts
into the outputs queue when using `continue` or `empty` recovery strategies.
*/
uint64_t get_outputs_queue_num_drops();

private:
std::unique_ptr<falco_formats> m_formats;

Expand Down Expand Up @@ -112,6 +120,8 @@ class falco_outputs
falco_outputs_cbq m_queue;
#endif

falco_common::outputs_queue_recovery_type m_outputs_queue_recovery;
std::atomic<uint64_t> m_outputs_queue_num_drops;
std::thread m_worker_thread;
inline void push(const ctrl_msg& cmsg);
inline void push_ctrl(ctrl_msg_type cmt);
Expand Down
4 changes: 4 additions & 0 deletions userspace/falco/stats_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ stats_writer::stats_writer(
if (m_initialized)
{
#ifndef __EMSCRIPTEN__
// capacity and controls should not be relevant for stats outputs, adopt capacity
// for completeness, but do not implement config recovery strategies.
m_queue.set_capacity(config->m_outputs_queue_capacity);
m_worker = std::thread(&stats_writer::worker, this);
#endif
}
Expand Down Expand Up @@ -228,6 +231,7 @@ void stats_writer::collector::get_metrics_output_fields_wrapper(
output_fields["falco.host_boot_ts"] = machine_info->boot_ts_epoch;
output_fields["falco.hostname"] = machine_info->hostname; /* Explicitly add hostname to log msg in case hostname rule output field is disabled. */
output_fields["falco.host_num_cpus"] = machine_info->num_cpus;
output_fields["falco.outputs_queue_num_drops"] = m_writer->m_outputs->get_outputs_queue_num_drops();

output_fields["evt.source"] = src;
for (size_t i = 0; i < sizeof(all_driver_engines) / sizeof(const char*); i++)
Expand Down
Loading