Skip to content

Commit

Permalink
Merge pull request #4560 from pwojcikdev/networking-fixes/daemon-latc…
Browse files Browse the repository at this point in the history
…h-rebased

Cleaner daemon stopping
  • Loading branch information
pwojcikdev authored Apr 17, 2024
2 parents b61020c + 2426f2c commit 4d66da7
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 94 deletions.
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum class type
rep_tiers,
syn_cookies,
thread_runner,
signal_manager,

// bootstrap
bulk_pull_client,
Expand Down
35 changes: 27 additions & 8 deletions nano/lib/signal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ void nano::signal_manager::register_signal_handler (int signum, std::function<vo
nano::signal_manager::base_handler (descriptor, error, signum);
});

log (boost::str (boost::format ("Registered signal handler for signal %d") % signum));
logger.debug (nano::log::type::signal_manager, "Registered signal handler for signal: {}", to_signal_name (signum));
}

void nano::signal_manager::base_handler (nano::signal_manager::signal_descriptor descriptor, boost::system::error_code const & error, int signum)
void nano::signal_manager::base_handler (nano::signal_manager::signal_descriptor descriptor, boost::system::error_code const & ec, int signum)
{
if (!error)
auto & logger = descriptor.sigman.logger;

if (!ec)
{
descriptor.sigman.log (boost::str (boost::format ("Signal received: %d") % signum));
logger.debug (nano::log::type::signal_manager, "Signal received: {}", to_signal_name (signum));

// call the user supplied function, if one is provided
if (descriptor.handler_func)
Expand All @@ -72,14 +74,31 @@ void nano::signal_manager::base_handler (nano::signal_manager::signal_descriptor
}
else
{
descriptor.sigman.log (boost::str (boost::format ("Signal handler %d will not repeat") % signum));
logger.debug (nano::log::type::signal_manager, "Signal handler {} will not repeat", to_signal_name (signum));

descriptor.sigset->clear ();
}

descriptor.sigman.log (boost::str (boost::format ("Signal processed: %d") % signum));
}
else
{
descriptor.sigman.log (boost::str (boost::format ("Signal error: %d (%s)") % error.value () % error.message ()));
logger.debug (nano::log::type::signal_manager, "Signal error: {} ({})", ec.message (), to_signal_name (signum));
}
}

std::string nano::to_signal_name (int signum)
{
switch (signum)
{
case SIGINT:
return "SIGINT";
case SIGTERM:
return "SIGTERM";
case SIGSEGV:
return "SIGSEGV";
case SIGABRT:
return "SIGABRT";
case SIGILL:
return "SIGILL";
}
return std::to_string (signum);
}
10 changes: 4 additions & 6 deletions nano/lib/signal_manager.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <nano/lib/logging.hpp>
#include <nano/lib/utility.hpp>

#include <boost/asio.hpp>
Expand Down Expand Up @@ -51,18 +52,14 @@ class signal_manager final
bool repeat;
};

/**
* Logging function of signal manager. It does nothing at the moment, it throws away the log.
* I expect to revisit this in the future. It also makes it easy to manually introduce logs, if needed temporarily.
*/
void log (std::string const &){};

/**
* This is the actual handler that is registered with boost asio.
* It calls the caller supplied function (if one is given) and sets the handler to repeat (or not).
*/
static void base_handler (nano::signal_manager::signal_descriptor descriptor, boost::system::error_code const & error, int signum);

nano::logger logger;

/** boost asio context to use */
boost::asio::io_context ioc;

Expand All @@ -76,4 +73,5 @@ class signal_manager final
std::thread thread;
};

std::string to_signal_name (int signum);
}
10 changes: 6 additions & 4 deletions nano/load_test/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <nano/boost/process/child.hpp>
#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/signal_manager.hpp>
#include <nano/lib/thread_runner.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/tomlconfig.hpp>
Expand Down Expand Up @@ -595,13 +596,14 @@ int main (int argc, char * const * argv)
std::shared_ptr<boost::asio::io_context> ioc_shared = std::make_shared<boost::asio::io_context> ();
boost::asio::io_context & ioc{ *ioc_shared };

debug_assert (!nano::signal_handler_impl);
nano::signal_handler_impl = [&ioc] () {
nano::signal_manager sigman;

auto signal_handler = [&ioc] (int signum) {
ioc.stop ();
};

std::signal (SIGINT, &nano::signal_handler);
std::signal (SIGTERM, &nano::signal_handler);
sigman.register_signal_handler (SIGINT, signal_handler, true);
sigman.register_signal_handler (SIGTERM, signal_handler, false);

tcp::resolver resolver{ ioc };
auto const primary_node_results = resolver.resolve ("::1", std::to_string (rpc_port_start));
Expand Down
73 changes: 35 additions & 38 deletions nano/nano_node/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
#include <nano/node/openclwork.hpp>
#include <nano/rpc/rpc.hpp>

#include <boost/format.hpp>
#include <boost/process.hpp>

#include <csignal>
#include <iostream>
#include <latch>

#include <fmt/chrono.h>

Expand Down Expand Up @@ -56,8 +56,6 @@ void install_abort_signal_handler ()
#endif
}

volatile sig_atomic_t sig_int_or_term = 0;

constexpr std::size_t OPEN_FILE_DESCRIPTORS_LIMIT = 16384;
}

Expand Down Expand Up @@ -146,16 +144,28 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
logger.info (nano::log::type::daemon, "Database backend: {}", node->store.vendor_get ());
logger.info (nano::log::type::daemon, "Start time: {:%c} UTC", fmt::gmtime (dateTime));

// IO context runner should be started first and stopped last to allow asio handlers to execute during node start/stop
runner = std::make_unique<nano::thread_runner> (io_ctx, node->config.io_threads);

node->start ();

nano::ipc::ipc_server ipc_server (*node, config.rpc);
std::atomic stopped{ false };

std::unique_ptr<nano::ipc::ipc_server> ipc_server = std::make_unique<nano::ipc::ipc_server> (*node, config.rpc);
std::unique_ptr<boost::process::child> rpc_process;
std::shared_ptr<nano::rpc> rpc;
std::unique_ptr<nano::rpc_handler_interface> rpc_handler;
std::shared_ptr<nano::rpc> rpc;

if (config.rpc_enable)
{
if (!config.rpc.child_process.enable)
{
auto stop_callback = [this, &stopped] () {
logger.warn (nano::log::type::daemon, "RPC stop request received, stopping...");
stopped = true;
stopped.notify_all ();
};

// Launch rpc in-process
nano::rpc_config rpc_config{ config.node.network_params.network };
auto error = nano::read_rpc_config_toml (data_path, rpc_config, flags.rpc_config_overrides);
Expand All @@ -166,16 +176,7 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
}

rpc_config.tls_config = tls_config;
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc_server, config.rpc,
[&ipc_server, &workers = node->workers, io_ctx_w = std::weak_ptr{ io_ctx }] () {
ipc_server.stop ();
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [io_ctx_w] () {
if (auto io_ctx_l = io_ctx_w.lock ())
{
io_ctx_l->stop ();
}
});
});
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, *ipc_server, config.rpc, stop_callback);
rpc = nano::get_rpc (io_ctx, rpc_config, *rpc_handler);
rpc->start ();
}
Expand All @@ -191,39 +192,35 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag

rpc_process = std::make_unique<boost::process::child> (config.rpc.child_process.rpc_path, "--daemon", "--data_path", data_path.string (), "--network", network);
}
debug_assert (rpc || rpc_process);
}

debug_assert (!nano::signal_handler_impl);
nano::signal_handler_impl = [this, io_ctx_w = std::weak_ptr{ io_ctx }] () {
logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping...");

if (auto io_ctx_l = io_ctx_w.lock ())
{
io_ctx_l->stop ();
}
sig_int_or_term = 1;
auto signal_handler = [this, &stopped] (int signum) {
logger.warn (nano::log::type::daemon, "Interrupt signal received ({}), stopping...", to_signal_name (signum));
stopped = true;
stopped.notify_all ();
};

nano::signal_manager sigman;

// keep trapping Ctrl-C to avoid a second Ctrl-C interrupting tasks started by the first
sigman.register_signal_handler (SIGINT, &nano::signal_handler, true);

sigman.register_signal_handler (SIGINT, signal_handler, true);
// sigterm is less likely to come in bunches so only trap it once
sigman.register_signal_handler (SIGTERM, &nano::signal_handler, false);
sigman.register_signal_handler (SIGTERM, signal_handler, false);

runner = std::make_unique<nano::thread_runner> (io_ctx, node->config.io_threads);
runner->join ();
// Keep running until stopped flag is set
stopped.wait (false);

if (sig_int_or_term == 1)
logger.info (nano::log::type::daemon, "Stopping...");

if (rpc)
{
ipc_server.stop ();
node->stop ();
if (rpc)
{
rpc->stop ();
}
rpc->stop ();
}
ipc_server->stop ();
node->stop ();
io_ctx->stop ();
runner->join ();

if (rpc_process)
{
rpc_process->wait ();
Expand All @@ -244,5 +241,5 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
logger.critical (nano::log::type::daemon, "Error deserializing config: {}", error.get_message ());
}

logger.info (nano::log::type::daemon, "Daemon exiting");
logger.info (nano::log::type::daemon, "Daemon stopped");
}
44 changes: 22 additions & 22 deletions nano/nano_rpc/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#include <boost/filesystem.hpp>
#include <boost/program_options.hpp>

#include <latch>

namespace
{
volatile sig_atomic_t sig_int_or_term = 0;

nano::logger logger{ "rpc_daemon" };

void run (std::filesystem::path const & data_path, std::vector<std::string> const & config_overrides)
Expand All @@ -41,7 +41,7 @@ void run (std::filesystem::path const & data_path, std::vector<std::string> cons
error = nano::read_tls_config_toml (data_path, *tls_config, logger);
if (error)
{
logger.critical (nano::log::type::daemon, "Error reading RPC TLS config: {}", error.get_message ());
logger.critical (nano::log::type::daemon_rpc, "Error reading RPC TLS config: {}", error.get_message ());
std::exit (1);
}
else
Expand All @@ -51,43 +51,43 @@ void run (std::filesystem::path const & data_path, std::vector<std::string> cons

std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();

nano::signal_manager sigman;
runner = std::make_unique<nano::thread_runner> (io_ctx, rpc_config.rpc_process.io_threads);

try
{
nano::ipc_rpc_processor ipc_rpc_processor (*io_ctx, rpc_config);
auto rpc = nano::get_rpc (io_ctx, rpc_config, ipc_rpc_processor);
rpc->start ();

debug_assert (!nano::signal_handler_impl);
nano::signal_handler_impl = [io_ctx_w = std::weak_ptr{ io_ctx }] () {
logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping...");
std::atomic stopped{ false };

if (auto io_ctx_l = io_ctx_w.lock ())
{
io_ctx_l->stop ();
}
sig_int_or_term = 1;
auto signal_handler = [&stopped] (int signum) {
logger.warn (nano::log::type::daemon_rpc, "Interrupt signal received ({}), stopping...", nano::to_signal_name (signum));
stopped = true;
stopped.notify_all ();
};

sigman.register_signal_handler (SIGINT, &nano::signal_handler, true);
sigman.register_signal_handler (SIGTERM, &nano::signal_handler, false);
nano::signal_manager sigman;
sigman.register_signal_handler (SIGINT, signal_handler, true);
sigman.register_signal_handler (SIGTERM, signal_handler, false);

runner = std::make_unique<nano::thread_runner> (io_ctx, rpc_config.rpc_process.io_threads);
runner->join ();
// Keep running until stopped flag is set
stopped.wait (false);

logger.info (nano::log::type::daemon_rpc, "Stopping...");

if (sig_int_or_term == 1)
{
rpc->stop ();
}
rpc->stop ();
io_ctx->stop ();
runner->join ();
}
catch (std::runtime_error const & e)
{
logger.critical (nano::log::type::daemon, "Error while running RPC: {}", e.what ());
logger.critical (nano::log::type::daemon_rpc, "Error while running RPC: {}", e.what ());
}
}
else
{
logger.critical (nano::log::type::daemon, "Error deserializing config: {}", error.get_message ());
logger.critical (nano::log::type::daemon_rpc, "Error deserializing config: {}", error.get_message ());
}

logger.info (nano::log::type::daemon_rpc, "Daemon stopped (RPC)");
Expand Down
13 changes: 0 additions & 13 deletions nano/secure/utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,3 @@ void nano::remove_temporary_directories ()
}
}
}

namespace nano
{
/** A wrapper for handling signals */
std::function<void ()> signal_handler_impl;
void signal_handler (int sig)
{
if (signal_handler_impl != nullptr)
{
signal_handler_impl ();
}
}
}
3 changes: 0 additions & 3 deletions nano/secure/utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,4 @@ std::filesystem::path working_path (nano::networks network = nano::network_const
std::filesystem::path unique_path (nano::networks network = nano::network_constants::active_network);
// Remove all unique tmp directories created by the process
void remove_temporary_directories ();
// Generic signal handler declarations
extern std::function<void ()> signal_handler_impl;
void signal_handler (int sig);
}

0 comments on commit 4d66da7

Please sign in to comment.