Skip to content

Commit

Permalink
Async task helper (#4590)
Browse files Browse the repository at this point in the history
* Introduce async task class

* Use in `tcp_listener`

* Comments

* Tests

* Use constructor for spawning
  • Loading branch information
pwojcikdev authored Apr 28, 2024
1 parent 5ef471f commit 3f61cd9
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 17 deletions.
62 changes: 62 additions & 0 deletions nano/core_test/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,66 @@ TEST (async, cancellation)

ASSERT_EQ (fut.wait_for (500ms), std::future_status::ready);
ASSERT_NO_THROW (fut.get ());
}

TEST (async, task)
{
nano::test::system system;

auto io_ctx = std::make_shared<asio::io_context> ();
nano::thread_runner runner{ io_ctx, 1 };
nano::async::strand strand{ io_ctx->get_executor () };

nano::async::task task{ strand };

// Default state, empty task
ASSERT_FALSE (task.joinable ());

task = nano::async::task (strand, [&] () -> asio::awaitable<void> {
co_await nano::async::sleep_for (500ms);
});

// Task should now be joinable, but not ready
ASSERT_TRUE (task.joinable ());
ASSERT_FALSE (task.ready ());

WAIT (50ms);
ASSERT_TRUE (task.joinable ());
ASSERT_FALSE (task.ready ());

WAIT (1s);

// Task completed, not yet joined
ASSERT_TRUE (task.joinable ());
ASSERT_TRUE (task.ready ());

task.join ();

ASSERT_FALSE (task.joinable ());
}

TEST (async, task_cancel)
{
nano::test::system system;

auto io_ctx = std::make_shared<asio::io_context> ();
nano::thread_runner runner{ io_ctx, 1 };
nano::async::strand strand{ io_ctx->get_executor () };

nano::async::task task = nano::async::task (strand, [&] () -> asio::awaitable<void> {
co_await nano::async::sleep_for (10s);
});

// Task should be joinable, but not ready
WAIT (100ms);
ASSERT_TRUE (task.joinable ());
ASSERT_FALSE (task.ready ());

task.cancel ();

WAIT (500ms);
ASSERT_TRUE (task.joinable ());
ASSERT_TRUE (task.ready ());

// It should not be necessary to join a ready task
}
107 changes: 102 additions & 5 deletions nano/lib/async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <boost/asio.hpp>

#include <future>

namespace asio = boost::asio;

namespace nano::async
Expand All @@ -27,14 +29,30 @@ class cancellation
{
public:
explicit cancellation (nano::async::strand & strand) :
strand{ strand }
strand{ strand },
signal{ std::make_unique<asio::cancellation_signal> () }
{
}

cancellation (cancellation && other) = default;

cancellation & operator= (cancellation && other)
{
// Can only move if the strands are the same
debug_assert (strand == other.strand);

if (this != &other)
{
signal = std::move (other.signal);
}
return *this;
};

public:
void emit (asio::cancellation_type type = asio::cancellation_type::all)
{
asio::dispatch (strand, asio::use_future ([this, type] () {
signal.emit (type);
signal->emit (type);
}))
.wait ();
}
Expand All @@ -43,13 +61,92 @@ class cancellation
{
// Ensure that the slot is only connected once
debug_assert (std::exchange (slotted, true) == false);
return signal.slot ();
return signal->slot ();
}

private:
nano::async::strand & strand;
asio::cancellation_signal signal;

private:
std::unique_ptr<asio::cancellation_signal> signal; // Wrap the signal in a unique_ptr to enable moving

bool slotted{ false };
};

/**
* Wrapper with convenience functions and safety checks for asynchronous tasks.
* Aims to provide interface similar to std::thread.
*/
class task
{
public:
// Only thread-like void tasks are supported for now
using value_type = void;

task (nano::async::strand & strand) :
strand{ strand },
cancellation{ strand }
{
}

task (nano::async::strand & strand, auto && func) :
strand{ strand },
cancellation{ strand }
{
future = asio::co_spawn (
strand,
std::forward<decltype (func)> (func),
asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));
}

~task ()
{
release_assert (!joinable () || ready (), "async task not joined before destruction");
}

task (task && other) = default;

task & operator= (task && other)
{
// Can only move if the strands are the same
debug_assert (strand == other.strand);

if (this != &other)
{
future = std::move (other.future);
cancellation = std::move (other.cancellation);
}
return *this;
}

public:
bool joinable () const
{
return future.valid ();
}

bool ready () const
{
release_assert (future.valid ());
return future.wait_for (std::chrono::seconds{ 0 }) == std::future_status::ready;
}

void join ()
{
release_assert (future.valid ());
future.wait ();
future = {};
}

void cancel ()
{
debug_assert (joinable ());
cancellation.emit ();
}

nano::async::strand & strand;

private:
std::future<value_type> future;
nano::async::cancellation cancellation;
};
}
19 changes: 9 additions & 10 deletions nano/node/transport/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
port{ port_a },
max_inbound_connections{ max_inbound_connections },
strand{ node_a.io_ctx.get_executor () },
cancellation{ strand },
acceptor{ strand }
acceptor{ strand },
task{ strand }
{
connection_accepted.add ([this] (auto const & socket, auto const & server) {
node.observers.socket_accepted.notify (*socket);
Expand All @@ -34,13 +34,13 @@ nano::transport::tcp_listener::~tcp_listener ()
{
// Thread should be stopped before destruction
debug_assert (!cleanup_thread.joinable ());
debug_assert (!future.valid () || future.wait_for (0s) == std::future_status::ready);
debug_assert (!task.joinable ());
}

void nano::transport::tcp_listener::start ()
{
debug_assert (!cleanup_thread.joinable ());
debug_assert (!future.valid ());
debug_assert (!task.joinable ());

try
{
Expand All @@ -64,8 +64,7 @@ void nano::transport::tcp_listener::start ()
throw;
}

future = asio::co_spawn (
strand, [this] () -> asio::awaitable<void> {
task = nano::async::task (strand, [this] () -> asio::awaitable<void> {
try
{
logger.debug (nano::log::type::tcp_listener, "Starting acceptor");
Expand All @@ -92,7 +91,7 @@ void nano::transport::tcp_listener::start ()
{
logger.critical (nano::log::type::tcp_listener, "Unknown error");
release_assert (false); // Unexpected error
} }, asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));
} });

cleanup_thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::tcp_listener);
Expand All @@ -113,10 +112,10 @@ void nano::transport::tcp_listener::stop ()
}
condition.notify_all ();

if (future.valid ())
if (task.joinable ())
{
cancellation.emit ();
future.wait ();
task.cancel ();
task.join ();
}
if (cleanup_thread.joinable ())
{
Expand Down
3 changes: 1 addition & 2 deletions nano/node/transport/tcp_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,14 @@ class tcp_listener final
ordered_connections connections;

nano::async::strand strand;
nano::async::cancellation cancellation;

asio::ip::tcp::acceptor acceptor;
asio::ip::tcp::endpoint local;

std::atomic<bool> stopped;
nano::condition_variable condition;
mutable nano::mutex mutex;
std::future<void> future;
nano::async::task task;
std::thread cleanup_thread;
};
}

0 comments on commit 3f61cd9

Please sign in to comment.