From 12e8cf011d8a54f5d3ddc77fddd5e65543565e81 Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Mon, 10 Oct 2022 10:31:39 +0000 Subject: [PATCH 1/7] fix(userspace/falco): make signal handlers safe with multi-threading Signed-off-by: Jason Dellaluce --- .../app_actions/create_signal_handlers.cpp | 48 ++++++++-------- .../falco/app_actions/process_events.cpp | 13 ++++- userspace/falco/application.cpp | 56 +++++++++++++++---- userspace/falco/application.h | 37 ++++++++---- 4 files changed, 108 insertions(+), 46 deletions(-) diff --git a/userspace/falco/app_actions/create_signal_handlers.cpp b/userspace/falco/app_actions/create_signal_handlers.cpp index 82ca99007b3..1d318b87eb8 100644 --- a/userspace/falco/app_actions/create_signal_handlers.cpp +++ b/userspace/falco/app_actions/create_signal_handlers.cpp @@ -30,26 +30,24 @@ using namespace falco::app; // provided application, and in unregister_signal_handlers it will be // rebound back to the dummy application. -static application dummy; -static std::reference_wrapper s_app = dummy; static int inot_fd; -static void signal_callback(int signal) +static void terminate_signal_handler(int signal) { - falco_logger::log(LOG_INFO, "SIGINT received, exiting...\n"); - s_app.get().terminate(); + ASSERT(falco::app::g_terminate.is_lock_free()); + falco::app::g_terminate.store(APP_SIGNAL_SET, std::memory_order_seq_cst); } -static void reopen_outputs(int signal) +static void reopen_outputs_signal_handler(int signal) { - falco_logger::log(LOG_INFO, "SIGUSR1 received, reopening outputs...\n"); - s_app.get().reopen_outputs(); + ASSERT(falco::app::g_reopen_outputs.is_lock_free()); + falco::app::g_reopen_outputs.store(APP_SIGNAL_SET, std::memory_order_seq_cst); } -static void restart_falco(int signal) +static void restart_signal_handler(int signal) { - falco_logger::log(LOG_INFO, "SIGHUP received, restarting...\n"); - s_app.get().restart(); + ASSERT(falco::app::g_restart.is_lock_free()); + falco::app::g_restart.store(APP_SIGNAL_SET, std::memory_order_seq_cst); } bool application::create_handler(int sig, void (*func)(int), run_result &ret) @@ -74,21 +72,29 @@ bool application::create_handler(int sig, void (*func)(int), run_result &ret) application::run_result application::create_signal_handlers() { - run_result ret; - s_app = *this; - if(! create_handler(SIGINT, ::signal_callback, ret) || - ! create_handler(SIGTERM, ::signal_callback, ret) || - ! create_handler(SIGUSR1, ::reopen_outputs, ret) || - ! create_handler(SIGHUP, ::restart_falco, ret)) + falco::app::g_terminate.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst); + falco::app::g_restart.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst); + falco::app::g_reopen_outputs.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst); + + if (!g_terminate.is_lock_free() + || !g_restart.is_lock_free() + || !g_reopen_outputs.is_lock_free()) { - s_app = dummy; + falco_logger::log(LOG_WARNING, "Bundled atomics implementation is not lock-free, signal handlers may be unstable\n"); } + + // we use the if just to make sure we return at the first failed statement + run_result ret; + if(! create_handler(SIGINT, ::terminate_signal_handler, ret) || + ! create_handler(SIGTERM, ::terminate_signal_handler, ret) || + ! create_handler(SIGUSR1, ::reopen_outputs_signal_handler, ret) || + ! create_handler(SIGHUP, ::restart_signal_handler, ret)); return ret; } application::run_result application::attach_inotify_signals() { - if (m_state->config->m_watch_config_files) + if (m_state->config->m_watch_config_files) { inot_fd = inotify_init(); if (inot_fd == -1) @@ -99,7 +105,7 @@ application::run_result application::attach_inotify_signals() struct sigaction sa; sigemptyset(&sa.sa_mask); sa.sa_flags = SA_RESTART; - sa.sa_handler = restart_falco; + sa.sa_handler = restart_signal_handler; if (sigaction(SIGIO, &sa, NULL) == -1) { return run_result::fatal("Failed to link SIGIO to inotify handler"); @@ -169,7 +175,5 @@ bool application::unregister_signal_handlers(std::string &errstr) errstr = ret.errstr; return false; } - - s_app = dummy; return true; } diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index 74563919ac1..214f8bfcbaa 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -97,11 +97,20 @@ application::run_result application::do_inspect( { rc = inspector->next(&ev); - if(m_state->terminate.load(std::memory_order_seq_cst) - || m_state->restart.load(std::memory_order_seq_cst)) + if(should_terminate()) { + terminate(); break; } + else if(should_restart()) + { + restart(); + break; + } + else if (should_reopen_outputs()) + { + reopen_outputs(); + } else if(rc == SCAP_TIMEOUT) { if(unlikely(ev == nullptr)) diff --git a/userspace/falco/application.cpp b/userspace/falco/application.cpp index 43933993b78..e44ee5ca189 100644 --- a/userspace/falco/application.cpp +++ b/userspace/falco/application.cpp @@ -26,9 +26,41 @@ limitations under the License. using namespace std::placeholders; +static inline bool should_take_action_to_signal(std::atomic& v) +{ + // we expected the signal to be received, and we try to set action-taken flag + int value = APP_SIGNAL_SET; + while (!v.compare_exchange_weak( + value, + APP_SIGNAL_ACTION_TAKEN, + std::memory_order_seq_cst, + std::memory_order_seq_cst)) + { + // application already took action, there's no need to do it twice + if (value == APP_SIGNAL_ACTION_TAKEN) + { + return false; + } + + // signal did was not really received, so we "fake" receiving it + if (value == APP_SIGNAL_NOT_SET) + { + v.store(APP_SIGNAL_SET, std::memory_order_seq_cst); + } + + // reset "expected" CAS variable and keep looping until we succeed + value = APP_SIGNAL_SET; + } + return true; +} + namespace falco { namespace app { +std::atomic g_terminate(APP_SIGNAL_NOT_SET); +std::atomic g_restart(APP_SIGNAL_NOT_SET); +std::atomic g_reopen_outputs(APP_SIGNAL_NOT_SET); + application::run_result::run_result() : success(true), errstr(""), proceed(true) { @@ -39,9 +71,7 @@ application::run_result::~run_result() } application::state::state() - : restart(false), - terminate(false), - loaded_sources(), + : loaded_sources(), enabled_sources(), source_infos(), plugin_configs(), @@ -70,27 +100,29 @@ application::~application() void application::terminate() { - if(m_state != nullptr) + if (should_take_action_to_signal(falco::app::g_terminate)) { - m_state->terminate.store(true, std::memory_order_seq_cst); + falco_logger::log(LOG_INFO, "SIGINT received, exiting...\n"); } } void application::reopen_outputs() { - if(m_state != nullptr && m_state->outputs != nullptr) + if (should_take_action_to_signal(falco::app::g_reopen_outputs)) { - // note: it is ok to do this inside the signal handler because - // in the current falco_outputs implementation this is non-blocking - m_state->outputs->reopen_outputs(); + falco_logger::log(LOG_INFO, "SIGUSR1 received, reopening outputs...\n"); + if(m_state != nullptr && m_state->outputs != nullptr) + { + m_state->outputs->reopen_outputs(); + } } } void application::restart() { - if(m_state != nullptr) + if (should_take_action_to_signal(falco::app::g_restart)) { - m_state->restart.store(true, std::memory_order_seq_cst); + falco_logger::log(LOG_INFO, "SIGHUP received, restarting...\n"); } } @@ -196,7 +228,7 @@ bool application::run(std::string &errstr, bool &restart) errstr = res.errstr; } - restart = m_state->restart; + restart = should_restart(); return res.success; } diff --git a/userspace/falco/application.h b/userspace/falco/application.h index 13bc3aaa280..9b123fa46b7 100644 --- a/userspace/falco/application.h +++ b/userspace/falco/application.h @@ -30,9 +30,19 @@ limitations under the License. #include #include +#define APP_SIGNAL_NOT_SET 0 // The signal flag is not set +#define APP_SIGNAL_SET 1 // The signal flag has been set +#define APP_SIGNAL_ACTION_TAKEN 2 // The signal flag has been set and the application took action + namespace falco { namespace app { +// these are used to control the lifecycle of the application +// through signal handlers or internal calls +extern std::atomic g_terminate; +extern std::atomic g_restart; +extern std::atomic g_reopen_outputs; + class application { public: application(); @@ -42,13 +52,6 @@ class application { application(const application&) = delete; application& operator = (const application&) = delete; - // These are only used in signal handlers. Other than there, - // the control flow of the application should not be changed - // from the outside. - void terminate(); - void reopen_outputs(); - void restart(); - bool init(int argc, char **argv, std::string &errstr); // Returns whether the application completed with errors or @@ -86,9 +89,6 @@ class application { state(); virtual ~state(); - std::atomic restart; - std::atomic terminate; - std::shared_ptr config; std::shared_ptr outputs; std::shared_ptr engine; @@ -317,6 +317,23 @@ class application { return !m_options.gvisor_config.empty(); } + // used in signal handlers to control the flow of the application + void terminate(); + void restart(); + void reopen_outputs(); + inline bool should_terminate() + { + return g_terminate.load(std::memory_order_seq_cst) != APP_SIGNAL_NOT_SET; + } + inline bool should_restart() + { + return g_restart.load(std::memory_order_seq_cst) != APP_SIGNAL_NOT_SET; + } + inline bool should_reopen_outputs() + { + return g_reopen_outputs.load(std::memory_order_seq_cst) != APP_SIGNAL_NOT_SET; + } + std::unique_ptr m_state; cmdline_options m_options; bool m_initialized; From 528059de3c8800c6857b11bbc1d32c5217fee394 Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Mon, 10 Oct 2022 11:02:15 +0000 Subject: [PATCH 2/7] fix(userspace/falco): make multi-source termination condition more stable Signed-off-by: Jason Dellaluce --- .../falco/app_actions/process_events.cpp | 48 ++++++++++++++----- userspace/falco/application.h | 1 + 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index 214f8bfcbaa..49a4fc41a80 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -20,6 +20,7 @@ limitations under the License. #include #include #include +#include #include #include "falco_utils.h" @@ -226,6 +227,7 @@ void application::process_inspector_events( std::shared_ptr inspector, std::shared_ptr statsw, std::string source, // an empty source represents capture mode + std::atomic* finished, application::run_result* res) noexcept { try @@ -273,6 +275,8 @@ void application::process_inspector_events( { *res = run_result::fatal(e.what()); } + + finished->store(true, std::memory_order_seq_cst); } static std::shared_ptr init_stats_writer(const cmdline_options& opts) @@ -310,7 +314,8 @@ application::run_result application::process_events() return res; } - process_inspector_events(m_state->offline_inspector, statsw, "", &res); + std::atomic finished; + process_inspector_events(m_state->offline_inspector, statsw, "", &finished, &res); m_state->offline_inspector->close(); // Honor -M also when using a trace file. @@ -333,6 +338,10 @@ application::run_result application::process_events() // the name of the source of which events are processed std::string source; + // set to true when the event processing loop finishes + std::unique_ptr> finished; + // set to true when the result has been collected after finishing + std::unique_ptr> joined; // the result of the event processing loop application::run_result res; // if non-null, the thread on which events are processed @@ -349,6 +358,10 @@ application::run_result application::process_events() ctxs.emplace_back(); auto& ctx = ctxs[ctxs.size() - 1]; ctx.source = source; + ctx.finished.reset(new std::atomic()); + ctx.joined.reset(new std::atomic()); + ctx.finished->store(false, std::memory_order_seq_cst); + ctx.joined->store(false, std::memory_order_seq_cst); auto src_info = m_state->source_infos.at(source); try @@ -366,13 +379,13 @@ application::run_result application::process_events() if (m_state->enabled_sources.size() == 1) { // optimization: with only one source we don't spawn additional threads - process_inspector_events(src_info->inspector, statsw, source, &ctx.res); + process_inspector_events(src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res); } else { ctx.thread.reset(new std::thread( &application::process_inspector_events, - this, src_info->inspector, statsw, source, &ctx.res)); + this, src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res)); } } catch (std::exception &e) @@ -394,24 +407,35 @@ application::run_result application::process_events() { if (!res.success && !termination_forced) { + falco_logger::log(LOG_INFO, "An error occurred in one event source, forcing termination...\n"); terminate(); termination_forced = true; } for (auto &ctx : ctxs) { - if (ctx.thread) + if (ctx.finished->load(std::memory_order_seq_cst) + && !ctx.joined->load(std::memory_order_seq_cst)) { - if (!ctx.thread->joinable()) + if (ctx.thread) { - continue; + if (!ctx.thread->joinable()) + { + // thread has finished executing but + // we already joined it, so we skip to the next one. + // technically, we should never get here because + // ctx.joined should already be true at this point + continue; + } + ctx.thread->join(); } - ctx.thread->join(); - ctx.thread = nullptr; + + falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n"); + m_state->source_infos.at(ctx.source)->inspector->close(); + + res = run_result::merge(res, ctx.res); + ctx.joined->store(true, std::memory_order_seq_cst); + closed_count++; } - falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n"); - m_state->source_infos.at(ctx.source)->inspector->close(); - res = run_result::merge(res, ctx.res); - closed_count++; } } } diff --git a/userspace/falco/application.h b/userspace/falco/application.h index 9b123fa46b7..767f3257414 100644 --- a/userspace/falco/application.h +++ b/userspace/falco/application.h @@ -304,6 +304,7 @@ class application { std::shared_ptr inspector, std::shared_ptr statsw, std::string source, // an empty source represents capture mode + std::atomic* finished, run_result* res) noexcept; /* Returns true if we are in capture mode. */ From 150f417efb297784d4ada2a58f18896c391bed1f Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Mon, 10 Oct 2022 11:13:28 +0000 Subject: [PATCH 3/7] chore(userspace/falco): fix typo Signed-off-by: Jason Dellaluce --- userspace/falco/app_actions/process_events.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index 49a4fc41a80..d7f95227908 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -407,7 +407,7 @@ application::run_result application::process_events() { if (!res.success && !termination_forced) { - falco_logger::log(LOG_INFO, "An error occurred in one event source, forcing termination...\n"); + falco_logger::log(LOG_INFO, "An error occurred in an event source, forcing termination...\n"); terminate(); termination_forced = true; } From 02518a0849c8d6cb763243bdd8933f52a4823a2e Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Mon, 10 Oct 2022 11:53:37 +0000 Subject: [PATCH 4/7] fix(userspace/falco): solve warning Signed-off-by: Jason Dellaluce --- userspace/falco/app_actions/create_signal_handlers.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/userspace/falco/app_actions/create_signal_handlers.cpp b/userspace/falco/app_actions/create_signal_handlers.cpp index 1d318b87eb8..08ae48a9e81 100644 --- a/userspace/falco/app_actions/create_signal_handlers.cpp +++ b/userspace/falco/app_actions/create_signal_handlers.cpp @@ -75,7 +75,7 @@ application::run_result application::create_signal_handlers() falco::app::g_terminate.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst); falco::app::g_restart.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst); falco::app::g_reopen_outputs.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst); - + if (!g_terminate.is_lock_free() || !g_restart.is_lock_free() || !g_reopen_outputs.is_lock_free()) @@ -83,12 +83,15 @@ application::run_result application::create_signal_handlers() falco_logger::log(LOG_WARNING, "Bundled atomics implementation is not lock-free, signal handlers may be unstable\n"); } - // we use the if just to make sure we return at the first failed statement run_result ret; if(! create_handler(SIGINT, ::terminate_signal_handler, ret) || ! create_handler(SIGTERM, ::terminate_signal_handler, ret) || ! create_handler(SIGUSR1, ::reopen_outputs_signal_handler, ret) || - ! create_handler(SIGHUP, ::restart_signal_handler, ret)); + ! create_handler(SIGHUP, ::restart_signal_handler, ret)) + { + // we use the if just to make sure we return at the first failed statement + } + return ret; } From 8884c6a04f35b904a389e7657174152eb2ba98ca Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Tue, 11 Oct 2022 15:40:58 +0000 Subject: [PATCH 5/7] fix(userspace/falco/falco): allow output reopening to happen multiple times Signed-off-by: Jason Dellaluce --- userspace/falco/application.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/userspace/falco/application.cpp b/userspace/falco/application.cpp index e44ee5ca189..e4df076c128 100644 --- a/userspace/falco/application.cpp +++ b/userspace/falco/application.cpp @@ -115,6 +115,7 @@ void application::reopen_outputs() { m_state->outputs->reopen_outputs(); } + falco::app::g_reopen_outputs.store(APP_SIGNAL_NOT_SET); } } From 9f39378d99bfab65182041822d99c166774bc331 Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Tue, 11 Oct 2022 15:42:39 +0000 Subject: [PATCH 6/7] fix(userspace/falco): check conditions in right order Signed-off-by: Jason Dellaluce --- userspace/falco/app_actions/process_events.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index d7f95227908..07f24cda9c1 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -98,6 +98,11 @@ application::run_result application::do_inspect( { rc = inspector->next(&ev); + if (should_reopen_outputs()) + { + reopen_outputs(); + } + if(should_terminate()) { terminate(); @@ -108,10 +113,6 @@ application::run_result application::do_inspect( restart(); break; } - else if (should_reopen_outputs()) - { - reopen_outputs(); - } else if(rc == SCAP_TIMEOUT) { if(unlikely(ev == nullptr)) From 976cafa69ce9875d175edb2ac60486f6e478f53a Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Tue, 11 Oct 2022 15:48:16 +0000 Subject: [PATCH 7/7] fix(userspace/falco): properly handle termination at source opening failures Signed-off-by: Jason Dellaluce --- userspace/falco/app_actions/process_events.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/userspace/falco/app_actions/process_events.cpp b/userspace/falco/app_actions/process_events.cpp index 07f24cda9c1..de55e219fba 100644 --- a/userspace/falco/app_actions/process_events.cpp +++ b/userspace/falco/app_actions/process_events.cpp @@ -374,6 +374,7 @@ application::run_result application::process_events() // note: we don't return here because we need to reach // the thread termination loop below to make sure all // already-spawned threads get terminated gracefully + ctx.finished->store(true, std::memory_order_seq_cst); break; } @@ -395,6 +396,7 @@ application::run_result application::process_events() // the thread termination loop below to make sure all // already-spawned threads get terminated gracefully ctx.res = run_result::fatal(e.what()); + ctx.finished->store(true, std::memory_order_seq_cst); break; } }