Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(userspace/falco)!: make output rate limiter optional and output engine explicitly thread-safe #2139

Merged
merged 6 commits into from
Aug 23, 2022
15 changes: 9 additions & 6 deletions falco.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,22 @@ syscall_event_timeouts:
output_timeout: 2000

# A throttling mechanism implemented as a token bucket limits the
# rate of falco notifications. This throttling is controlled by the following configuration
# options:
# rate of Falco notifications. One rate limiter is assigned to each event
# source, so that alerts coming from one can't influence the throttling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already talking about multiple event sources; isn't it premature here? :D

# mechanism of the others. This is controlled by the following options:
# - rate: the number of tokens (i.e. right to send a notification)
# gained per second. Defaults to 1.
# gained per second. When 0, the throttling mechanism is disabled.
# Defaults to 0.
# - max_burst: the maximum number of tokens outstanding. Defaults to 1000.
#
# With these defaults, falco could send up to 1000 notifications after
# an initial quiet period, and then up to 1 notification per second
# With these defaults, the throttling mechanism is disabled.
# For example, by setting rate to 1 Falco could send up to 1000 notifications
# after an initial quiet period, and then up to 1 notification per second
# afterward. It would gain the full burst back after 1000 seconds of
# no activity.

outputs:
rate: 1
rate: 0
max_burst: 1000

# Where security notifications should go.
Expand Down
24 changes: 10 additions & 14 deletions userspace/falco/app_actions/init_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,16 @@ application::run_result application::init_outputs()
hostname = c_hostname;
}

m_state->outputs->init(m_state->engine,
m_state->config->m_json_output,
m_state->config->m_json_include_output_property,
m_state->config->m_json_include_tags_property,
m_state->config->m_output_timeout,
m_state->config->m_notifications_rate, m_state->config->m_notifications_max_burst,
m_state->config->m_buffered_outputs,
m_state->config->m_time_format_iso_8601,
hostname);

for(auto output : m_state->config->m_outputs)
{
m_state->outputs->add_output(output);
}
m_state->outputs.reset(new falco_outputs(
m_state->engine,
m_state->config->m_outputs,
m_state->config->m_json_output,
m_state->config->m_json_include_output_property,
m_state->config->m_json_include_tags_property,
m_state->config->m_output_timeout,
m_state->config->m_buffered_outputs,
m_state->config->m_time_format_iso_8601,
hostname));

return run_result::ok();
}
23 changes: 21 additions & 2 deletions userspace/falco/app_actions/process_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ limitations under the License.
#endif
#include "statsfilewriter.h"
#include "application.h"
#include "falco_outputs.h"
#include "token_bucket.h"

#include <plugin_manager.h>

Expand All @@ -47,11 +49,21 @@ application::run_result application::do_inspect(syscall_evt_drop_mgr &sdropmgr,
uint32_t timeouts_since_last_success_or_msg = 0;
std::size_t source_idx;
bool source_idx_found = false;
token_bucket rate_limiter;
bool rate_limiter_enabled = m_state->config->m_notifications_rate > 0;

// if enabled, init rate limiter
if (rate_limiter_enabled)
{
rate_limiter.init(
m_state->config->m_notifications_rate,
m_state->config->m_notifications_max_burst);
}

num_evts = 0;

sdropmgr.init(m_state->inspector,
m_state->outputs,
m_state->outputs, // drop manager has its own rate limiting logic
m_state->config->m_syscall_evt_drop_actions,
m_state->config->m_syscall_evt_drop_threshold,
m_state->config->m_syscall_evt_drop_rate,
Expand Down Expand Up @@ -184,7 +196,14 @@ application::run_result application::do_inspect(syscall_evt_drop_mgr &sdropmgr,
unique_ptr<falco_engine::rule_result> res = m_state->engine->process_event(source_idx, ev);
if(res)
{
m_state->outputs->handle_event(res->evt, res->rule, res->source, res->priority_num, res->format, res->tags);
if (!rate_limiter_enabled || rate_limiter.claim())
{
m_state->outputs->handle_event(res->evt, res->rule, res->source, res->priority_num, res->format, res->tags);
}
else
{
falco_logger::log(LOG_DEBUG, "Skipping rate-limited notification for rule " + res->rule + "\n");
}
}

num_evts++;
Expand Down
2 changes: 1 addition & 1 deletion userspace/falco/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ application::state::state()
enabled_sources({falco_common::syscall_source})
{
config = std::make_shared<falco_configuration>();
outputs = std::make_shared<falco_outputs>();
engine = std::make_shared<falco_engine>();
inspector = std::make_shared<sinsp>();
outputs = nullptr;
}

application::state::~state()
Expand Down
2 changes: 1 addition & 1 deletion userspace/falco/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void falco_configuration::init(string conf_filename, const vector<string> &cmdli

m_output_timeout = m_config->get_scalar<uint32_t>("output_timeout", 2000);

m_notifications_rate = m_config->get_scalar<uint32_t>("outputs.rate", 1);
m_notifications_rate = m_config->get_scalar<uint32_t>("outputs.rate", 0);
m_notifications_max_burst = m_config->get_scalar<uint32_t>("outputs.max_burst", 1000);

string priority = m_config->get_scalar<string>("priority", "debug");
Expand Down
78 changes: 26 additions & 52 deletions userspace/falco/falco_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,69 +39,49 @@ limitations under the License.

using namespace std;

falco_outputs::falco_outputs():
m_initialized(false),
m_buffered(true),
m_json_output(false),
m_time_format_iso_8601(false),
m_hostname("")
static const char* s_internal_source = "internal";

falco_outputs::falco_outputs(
std::shared_ptr<falco_engine> engine,
const std::vector<falco::outputs::config>& outputs,
bool json_output,
bool json_include_output_property,
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
bool time_format_iso_8601,
std::string hostname)
{
}

falco_outputs::~falco_outputs()
{
if(m_initialized)
{
this->stop_worker();
for(auto o : m_outputs)
{
delete o;
}
}
}

void falco_outputs::init(std::shared_ptr<falco_engine> engine,
bool json_output,
bool json_include_output_property,
bool json_include_tags_property,
uint32_t timeout,
uint32_t rate, uint32_t max_burst, bool buffered,
bool time_format_iso_8601, std::string hostname)
{
// Cannot be initialized more than one time.
if(m_initialized)
{
throw falco_exception("falco_outputs already initialized");
}

m_formats.reset(new falco_formats(engine, json_include_output_property, json_include_tags_property));

m_json_output = json_output;

m_timeout = std::chrono::milliseconds(timeout);

m_notifications_tb.init(rate, max_burst);

m_buffered = buffered;
m_time_format_iso_8601 = time_format_iso_8601;
m_hostname = hostname;

m_worker_thread = std::thread(&falco_outputs::worker, this);
for(const auto& output : outputs)
{
add_output(output);
}

m_initialized = true;
m_worker_thread = std::thread(&falco_outputs::worker, this);
}

// This function has to be called after init() since some configuration settings
// need to be passed to the output plugins. Then, although the worker has started,
// the worker is still on hold, waiting for a message.
// Thus it is still safe to call add_output() before any message has been enqueued.
void falco_outputs::add_output(falco::outputs::config oc)
falco_outputs::~falco_outputs()
{
if(!m_initialized)
this->stop_worker();
for(auto o : m_outputs)
{
throw falco_exception("cannot add output: falco_outputs not initialized yet");
delete o;
}
}

// This function is called only at initialization-time by the constructor
void falco_outputs::add_output(falco::outputs::config oc)
{
falco::outputs::abstract_output *oo;

if(oc.name == "file")
Expand Down Expand Up @@ -142,12 +122,6 @@ void falco_outputs::add_output(falco::outputs::config oc)
void falco_outputs::handle_event(gen_event *evt, string &rule, string &source,
falco_common::priority_type priority, string &format, std::set<std::string> &tags)
{
if(!m_notifications_tb.claim())
{
falco_logger::log(LOG_DEBUG, "Skipping rate-limited notification for rule " + rule + "\n");
return;
}

falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = evt->get_ts();
cmsg.priority = priority;
Expand Down Expand Up @@ -192,7 +166,7 @@ void falco_outputs::handle_msg(uint64_t ts,
falco_outputs::ctrl_msg cmsg = {};
cmsg.ts = ts;
cmsg.priority = priority;
cmsg.source = "internal";
cmsg.source = s_internal_source;
cmsg.rule = rule;
cmsg.fields = output_fields;

Expand Down
61 changes: 38 additions & 23 deletions userspace/falco/falco_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,71 @@ limitations under the License.

#include "gen_filter.h"
#include "falco_common.h"
#include "token_bucket.h"
#include "falco_engine.h"
#include "outputs.h"
#include "formats.h"
#include "tbb/concurrent_queue.h"

//
// This class acts as the primary interface between a program and the
// falco output engine. The falco rules engine is implemented by a
// separate class falco_engine.
//
/*!
\brief This class acts as the primary interface between a program and the
falco output engine. The falco rules engine is implemented by a
separate class falco_engine.

All methods in this class are thread-safe. The output framework supports
a multi-producer model where messages are stored in a queue and consumed
by each configured output asynchrounously.
*/
class falco_outputs
{
public:
falco_outputs();
virtual ~falco_outputs();

void init(std::shared_ptr<falco_engine> engine,
bool json_output,
bool json_include_output_property,
bool json_include_tags_property,
uint32_t timeout,
uint32_t rate, uint32_t max_burst, bool buffered,
bool time_format_iso_8601, std::string hostname);
falco_outputs(
std::shared_ptr<falco_engine> engine,
const std::vector<falco::outputs::config>& outputs,
bool json_output,
bool json_include_output_property,
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
bool time_format_iso_8601,
std::string hostname);

void add_output(falco::outputs::config oc);
virtual ~falco_outputs();

// Format then send the event to all configured outputs (`evt` is an event that has matched some rule).
/*!
\brief Format then send the event to all configured outputs (`evt`
is an event that has matched some rule).
*/
void handle_event(gen_event *evt, std::string &rule, std::string &source,
falco_common::priority_type priority, std::string &format, std::set<std::string> &tags);

// Format then send a generic message to all outputs. Not necessarily associated with any event.
/*!
\brief Format then send a generic message to all outputs.
Not necessarily associated with any event.
*/
void handle_msg(uint64_t now,
falco_common::priority_type priority,
std::string &msg,
std::string &rule,
std::map<std::string, std::string> &output_fields);

/*!
\brief Sends a cleanup message to all outputs.
Each output can have an implementation-specific behavior.
In general, this is used to flush or clean output buffers.
*/
void cleanup_outputs();

/*!
\brief Sends a message to all outputs that causes them to be closed and
reopened. Each output can have an implementation-specific behavior.
*/
void reopen_outputs();

private:
std::unique_ptr<falco_formats> m_formats;
bool m_initialized;

std::vector<falco::outputs::abstract_output *> m_outputs;

// Rate limits notifications
token_bucket m_notifications_tb;

bool m_buffered;
bool m_json_output;
bool m_time_format_iso_8601;
Expand Down Expand Up @@ -99,4 +113,5 @@ class falco_outputs
inline void push(ctrl_msg_type cmt);
void worker() noexcept;
void stop_worker();
void add_output(falco::outputs::config oc);
};