Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(userspace/falco): make termination and signal handlers more stable #2239

Merged
merged 7 commits into from
Oct 11, 2022
49 changes: 28 additions & 21 deletions userspace/falco/app_actions/create_signal_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,24 @@ using namespace falco::app;
// provided application, and in unregister_signal_handlers it will be
// rebound back to the dummy application.

static application dummy;
static std::reference_wrapper<application> s_app = dummy;
static int inot_fd;

static void signal_callback(int signal)
static void terminate_signal_handler(int signal)
{
falco_logger::log(LOG_INFO, "SIGINT received, exiting...\n");
s_app.get().terminate();
ASSERT(falco::app::g_terminate.is_lock_free());
falco::app::g_terminate.store(APP_SIGNAL_SET, std::memory_order_seq_cst);
}

static void reopen_outputs(int signal)
static void reopen_outputs_signal_handler(int signal)
{
falco_logger::log(LOG_INFO, "SIGUSR1 received, reopening outputs...\n");
s_app.get().reopen_outputs();
ASSERT(falco::app::g_reopen_outputs.is_lock_free());
falco::app::g_reopen_outputs.store(APP_SIGNAL_SET, std::memory_order_seq_cst);
}

static void restart_falco(int signal)
static void restart_signal_handler(int signal)
{
falco_logger::log(LOG_INFO, "SIGHUP received, restarting...\n");
s_app.get().restart();
ASSERT(falco::app::g_restart.is_lock_free());
falco::app::g_restart.store(APP_SIGNAL_SET, std::memory_order_seq_cst);
}

bool application::create_handler(int sig, void (*func)(int), run_result &ret)
Expand All @@ -74,21 +72,32 @@ bool application::create_handler(int sig, void (*func)(int), run_result &ret)

application::run_result application::create_signal_handlers()
{
falco::app::g_terminate.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst);
falco::app::g_restart.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst);
falco::app::g_reopen_outputs.store(APP_SIGNAL_NOT_SET, std::memory_order_seq_cst);

if (!g_terminate.is_lock_free()
|| !g_restart.is_lock_free()
|| !g_reopen_outputs.is_lock_free())
{
falco_logger::log(LOG_WARNING, "Bundled atomics implementation is not lock-free, signal handlers may be unstable\n");
}

run_result ret;
s_app = *this;
if(! create_handler(SIGINT, ::signal_callback, ret) ||
! create_handler(SIGTERM, ::signal_callback, ret) ||
! create_handler(SIGUSR1, ::reopen_outputs, ret) ||
! create_handler(SIGHUP, ::restart_falco, ret))
if(! create_handler(SIGINT, ::terminate_signal_handler, ret) ||
! create_handler(SIGTERM, ::terminate_signal_handler, ret) ||
! create_handler(SIGUSR1, ::reopen_outputs_signal_handler, ret) ||
! create_handler(SIGHUP, ::restart_signal_handler, ret))
Comment on lines +87 to +90
Copy link
Member

@Andreagit97 Andreagit97 Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about something like:

Suggested change
if(! create_handler(SIGINT, ::terminate_signal_handler, ret) ||
! create_handler(SIGTERM, ::terminate_signal_handler, ret) ||
! create_handler(SIGUSR1, ::reopen_outputs_signal_handler, ret) ||
! create_handler(SIGHUP, ::restart_signal_handler, ret))
return !create_handler(SIGINT, ::terminate_signal_handler, ret) ||
!create_handler(SIGTERM, ::terminate_signal_handler, ret) ||
!create_handler(SIGUSR1, ::reopen_outputs_signal_handler, ret) ||
!create_handler(SIGHUP, ::restart_signal_handler, ret)) ? run_result::fatal("") : run_result::ok();

Or if we don't want to return fatal just run_result::ok() in both cases 🤔 just a nitpick feel free to ignore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_handler already sets its own error results in the passed-in ret, so errors are already catched.

{
s_app = dummy;
// we use the if just to make sure we return at the first failed statement
}

return ret;
}

application::run_result application::attach_inotify_signals()
{
if (m_state->config->m_watch_config_files)
if (m_state->config->m_watch_config_files)
{
inot_fd = inotify_init();
if (inot_fd == -1)
Expand All @@ -99,7 +108,7 @@ application::run_result application::attach_inotify_signals()
struct sigaction sa;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
sa.sa_handler = restart_falco;
sa.sa_handler = restart_signal_handler;
if (sigaction(SIGIO, &sa, NULL) == -1)
{
return run_result::fatal("Failed to link SIGIO to inotify handler");
Expand Down Expand Up @@ -169,7 +178,5 @@ bool application::unregister_signal_handlers(std::string &errstr)
errstr = ret.errstr;
return false;
}

s_app = dummy;
return true;
}
64 changes: 50 additions & 14 deletions userspace/falco/app_actions/process_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <atomic>
#include <unordered_map>

#include "falco_utils.h"
Expand Down Expand Up @@ -97,9 +98,19 @@ application::run_result application::do_inspect(
{
rc = inspector->next(&ev);

if(m_state->terminate.load(std::memory_order_seq_cst)
|| m_state->restart.load(std::memory_order_seq_cst))
if (should_reopen_outputs())
{
reopen_outputs();
}

if(should_terminate())
{
terminate();
break;
}
else if(should_restart())
{
restart();
break;
}
else if(rc == SCAP_TIMEOUT)
Expand Down Expand Up @@ -217,6 +228,7 @@ void application::process_inspector_events(
std::shared_ptr<sinsp> inspector,
std::shared_ptr<stats_writer> statsw,
std::string source, // an empty source represents capture mode
std::atomic<bool>* finished,
application::run_result* res) noexcept
{
try
Expand Down Expand Up @@ -264,6 +276,8 @@ void application::process_inspector_events(
{
*res = run_result::fatal(e.what());
}

finished->store(true, std::memory_order_seq_cst);
}

static std::shared_ptr<stats_writer> init_stats_writer(const cmdline_options& opts)
Expand Down Expand Up @@ -301,7 +315,8 @@ application::run_result application::process_events()
return res;
}

process_inspector_events(m_state->offline_inspector, statsw, "", &res);
std::atomic<bool> finished;
process_inspector_events(m_state->offline_inspector, statsw, "", &finished, &res);
m_state->offline_inspector->close();

// Honor -M also when using a trace file.
Expand All @@ -324,6 +339,10 @@ application::run_result application::process_events()

// the name of the source of which events are processed
std::string source;
// set to true when the event processing loop finishes
std::unique_ptr<std::atomic<bool>> finished;
// set to true when the result has been collected after finishing
std::unique_ptr<std::atomic<bool>> joined;
// the result of the event processing loop
application::run_result res;
// if non-null, the thread on which events are processed
Expand All @@ -340,6 +359,10 @@ application::run_result application::process_events()
ctxs.emplace_back();
auto& ctx = ctxs[ctxs.size() - 1];
ctx.source = source;
ctx.finished.reset(new std::atomic<bool>());
ctx.joined.reset(new std::atomic<bool>());
ctx.finished->store(false, std::memory_order_seq_cst);
ctx.joined->store(false, std::memory_order_seq_cst);
auto src_info = m_state->source_infos.at(source);

try
Expand All @@ -351,19 +374,20 @@ application::run_result application::process_events()
// note: we don't return here because we need to reach
// the thread termination loop below to make sure all
// already-spawned threads get terminated gracefully
ctx.finished->store(true, std::memory_order_seq_cst);
break;
}

if (m_state->enabled_sources.size() == 1)
{
// optimization: with only one source we don't spawn additional threads
process_inspector_events(src_info->inspector, statsw, source, &ctx.res);
process_inspector_events(src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res);
}
else
{
ctx.thread.reset(new std::thread(
&application::process_inspector_events,
this, src_info->inspector, statsw, source, &ctx.res));
this, src_info->inspector, statsw, source, ctx.finished.get(), &ctx.res));
}
}
catch (std::exception &e)
Expand All @@ -372,6 +396,7 @@ application::run_result application::process_events()
// the thread termination loop below to make sure all
// already-spawned threads get terminated gracefully
ctx.res = run_result::fatal(e.what());
ctx.finished->store(true, std::memory_order_seq_cst);
break;
}
}
Expand All @@ -385,24 +410,35 @@ application::run_result application::process_events()
{
if (!res.success && !termination_forced)
{
falco_logger::log(LOG_INFO, "An error occurred in an event source, forcing termination...\n");
terminate();
termination_forced = true;
}
for (auto &ctx : ctxs)
{
if (ctx.thread)
if (ctx.finished->load(std::memory_order_seq_cst)
&& !ctx.joined->load(std::memory_order_seq_cst))
{
if (!ctx.thread->joinable())
if (ctx.thread)
{
continue;
if (!ctx.thread->joinable())
{
// thread has finished executing but
// we already joined it, so we skip to the next one.
// technically, we should never get here because
// ctx.joined should already be true at this point
continue;
}
ctx.thread->join();
}
ctx.thread->join();
ctx.thread = nullptr;

falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n");
m_state->source_infos.at(ctx.source)->inspector->close();

res = run_result::merge(res, ctx.res);
ctx.joined->store(true, std::memory_order_seq_cst);
closed_count++;
}
falco_logger::log(LOG_DEBUG, "Closing event source '" + ctx.source + "'\n");
m_state->source_infos.at(ctx.source)->inspector->close();
res = run_result::merge(res, ctx.res);
closed_count++;
}
}
}
Expand Down
57 changes: 45 additions & 12 deletions userspace/falco/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,41 @@ limitations under the License.

using namespace std::placeholders;

static inline bool should_take_action_to_signal(std::atomic<int>& v)
{
// we expected the signal to be received, and we try to set action-taken flag
int value = APP_SIGNAL_SET;
while (!v.compare_exchange_weak(
value,
APP_SIGNAL_ACTION_TAKEN,
std::memory_order_seq_cst,
std::memory_order_seq_cst))
{
// application already took action, there's no need to do it twice
if (value == APP_SIGNAL_ACTION_TAKEN)
{
return false;
}

// signal did was not really received, so we "fake" receiving it
if (value == APP_SIGNAL_NOT_SET)
{
v.store(APP_SIGNAL_SET, std::memory_order_seq_cst);
}

// reset "expected" CAS variable and keep looping until we succeed
value = APP_SIGNAL_SET;
}
return true;
}

namespace falco {
namespace app {

std::atomic<int> g_terminate(APP_SIGNAL_NOT_SET);
std::atomic<int> g_restart(APP_SIGNAL_NOT_SET);
std::atomic<int> g_reopen_outputs(APP_SIGNAL_NOT_SET);

application::run_result::run_result()
: success(true), errstr(""), proceed(true)
{
Expand All @@ -39,9 +71,7 @@ application::run_result::~run_result()
}

application::state::state()
: restart(false),
terminate(false),
loaded_sources(),
: loaded_sources(),
enabled_sources(),
source_infos(),
plugin_configs(),
Expand Down Expand Up @@ -70,27 +100,30 @@ application::~application()

void application::terminate()
{
if(m_state != nullptr)
if (should_take_action_to_signal(falco::app::g_terminate))
{
m_state->terminate.store(true, std::memory_order_seq_cst);
falco_logger::log(LOG_INFO, "SIGINT received, exiting...\n");
}
}

void application::reopen_outputs()
{
if(m_state != nullptr && m_state->outputs != nullptr)
if (should_take_action_to_signal(falco::app::g_reopen_outputs))
{
// note: it is ok to do this inside the signal handler because
// in the current falco_outputs implementation this is non-blocking
m_state->outputs->reopen_outputs();
falco_logger::log(LOG_INFO, "SIGUSR1 received, reopening outputs...\n");
if(m_state != nullptr && m_state->outputs != nullptr)
{
m_state->outputs->reopen_outputs();
}
falco::app::g_reopen_outputs.store(APP_SIGNAL_NOT_SET);
}
}

void application::restart()
{
if(m_state != nullptr)
if (should_take_action_to_signal(falco::app::g_restart))
{
m_state->restart.store(true, std::memory_order_seq_cst);
falco_logger::log(LOG_INFO, "SIGHUP received, restarting...\n");
}
}

Expand Down Expand Up @@ -196,7 +229,7 @@ bool application::run(std::string &errstr, bool &restart)
errstr = res.errstr;
}

restart = m_state->restart;
restart = should_restart();

return res.success;
}
Expand Down
Loading