Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async task helper #4590

Merged
merged 5 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions nano/core_test/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,66 @@ TEST (async, cancellation)

ASSERT_EQ (fut.wait_for (500ms), std::future_status::ready);
ASSERT_NO_THROW (fut.get ());
}

TEST (async, task)
{
nano::test::system system;

auto io_ctx = std::make_shared<asio::io_context> ();
nano::thread_runner runner{ io_ctx, 1 };
nano::async::strand strand{ io_ctx->get_executor () };

nano::async::task task{ strand };

// Default state, empty task
ASSERT_FALSE (task.joinable ());

task = nano::async::task (strand, [&] () -> asio::awaitable<void> {
co_await nano::async::sleep_for (500ms);
});

// Task should now be joinable, but not ready
ASSERT_TRUE (task.joinable ());
ASSERT_FALSE (task.ready ());

WAIT (50ms);
ASSERT_TRUE (task.joinable ());
ASSERT_FALSE (task.ready ());

WAIT (1s);

// Task completed, not yet joined
ASSERT_TRUE (task.joinable ());
ASSERT_TRUE (task.ready ());

task.join ();

ASSERT_FALSE (task.joinable ());
}

TEST (async, task_cancel)
{
nano::test::system system;

auto io_ctx = std::make_shared<asio::io_context> ();
nano::thread_runner runner{ io_ctx, 1 };
nano::async::strand strand{ io_ctx->get_executor () };

nano::async::task task = nano::async::task (strand, [&] () -> asio::awaitable<void> {
co_await nano::async::sleep_for (10s);
});

// Task should be joinable, but not ready
WAIT (100ms);
ASSERT_TRUE (task.joinable ());
ASSERT_FALSE (task.ready ());

task.cancel ();

WAIT (500ms);
ASSERT_TRUE (task.joinable ());
ASSERT_TRUE (task.ready ());

// It should not be necessary to join a ready task
}
107 changes: 102 additions & 5 deletions nano/lib/async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <boost/asio.hpp>

#include <future>

namespace asio = boost::asio;

namespace nano::async
Expand All @@ -27,14 +29,30 @@ class cancellation
{
public:
explicit cancellation (nano::async::strand & strand) :
strand{ strand }
strand{ strand },
signal{ std::make_unique<asio::cancellation_signal> () }
{
}

cancellation (cancellation && other) = default;

cancellation & operator= (cancellation && other)
{
// Can only move if the strands are the same
debug_assert (strand == other.strand);

if (this != &other)
{
signal = std::move (other.signal);
}
return *this;
};

public:
void emit (asio::cancellation_type type = asio::cancellation_type::all)
{
asio::dispatch (strand, asio::use_future ([this, type] () {
signal.emit (type);
signal->emit (type);
}))
.wait ();
}
Expand All @@ -43,13 +61,92 @@ class cancellation
{
// Ensure that the slot is only connected once
debug_assert (std::exchange (slotted, true) == false);
return signal.slot ();
return signal->slot ();
}

private:
nano::async::strand & strand;
asio::cancellation_signal signal;

private:
std::unique_ptr<asio::cancellation_signal> signal; // Wrap the signal in a unique_ptr to enable moving

bool slotted{ false };
};

/**
* Wrapper with convenience functions and safety checks for asynchronous tasks.
* Aims to provide interface similar to std::thread.
*/
class task
{
public:
// Only thread-like void tasks are supported for now
using value_type = void;

task (nano::async::strand & strand) :
strand{ strand },
cancellation{ strand }
{
}

task (nano::async::strand & strand, auto && func) :
strand{ strand },
cancellation{ strand }
{
future = asio::co_spawn (
strand,
std::forward<decltype (func)> (func),
asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));
}

~task ()
{
release_assert (!joinable () || ready (), "async task not joined before destruction");
}

task (task && other) = default;

task & operator= (task && other)
{
// Can only move if the strands are the same
debug_assert (strand == other.strand);

if (this != &other)
{
future = std::move (other.future);
cancellation = std::move (other.cancellation);
}
return *this;
}

public:
bool joinable () const
{
return future.valid ();
}

bool ready () const
{
release_assert (future.valid ());
return future.wait_for (std::chrono::seconds{ 0 }) == std::future_status::ready;
}

void join ()
{
release_assert (future.valid ());
future.wait ();
future = {};
}

void cancel ()
{
debug_assert (joinable ());
cancellation.emit ();
}

nano::async::strand & strand;

private:
std::future<value_type> future;
nano::async::cancellation cancellation;
};
}
19 changes: 9 additions & 10 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
port{ port_a },
max_inbound_connections{ max_inbound_connections },
strand{ node_a.io_ctx.get_executor () },
cancellation{ strand },
acceptor{ strand }
acceptor{ strand },
task{ strand }
{
connection_accepted.add ([this] (auto const & socket, auto const & server) {
node.observers.socket_accepted.notify (*socket);
Expand All @@ -34,13 +34,13 @@ nano::transport::tcp_listener::~tcp_listener ()
{
// Thread should be stopped before destruction
debug_assert (!cleanup_thread.joinable ());
debug_assert (!future.valid () || future.wait_for (0s) == std::future_status::ready);
debug_assert (!task.joinable ());
}

void nano::transport::tcp_listener::start ()
{
debug_assert (!cleanup_thread.joinable ());
debug_assert (!future.valid ());
debug_assert (!task.joinable ());

try
{
Expand All @@ -64,8 +64,7 @@ void nano::transport::tcp_listener::start ()
throw;
}

future = asio::co_spawn (
strand, [this] () -> asio::awaitable<void> {
task = nano::async::task (strand, [this] () -> asio::awaitable<void> {
try
{
logger.debug (nano::log::type::tcp_listener, "Starting acceptor");
Expand All @@ -92,7 +91,7 @@ void nano::transport::tcp_listener::start ()
{
logger.critical (nano::log::type::tcp_listener, "Unknown error");
release_assert (false); // Unexpected error
} }, asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));
} });

cleanup_thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::tcp_listener);
Expand All @@ -113,10 +112,10 @@ void nano::transport::tcp_listener::stop ()
}
condition.notify_all ();

if (future.valid ())
if (task.joinable ())
{
cancellation.emit ();
future.wait ();
task.cancel ();
task.join ();
}
if (cleanup_thread.joinable ())
{
Expand Down
3 changes: 1 addition & 2 deletions nano/node/transport/tcp_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,14 @@ class tcp_listener final
ordered_connections connections;

nano::async::strand strand;
nano::async::cancellation cancellation;

asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::endpoint local;

std::atomic<bool> stopped;
nano::condition_variable condition;
mutable nano::mutex mutex;
std::future<void> future;
nano::async::task task;
std::thread cleanup_thread;
};
}
Loading