Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce waker and wait_queue abstractions. fixes #251, #259 #261

Merged
merged 2 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/Jamfile.v2
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lib boost_fiber
condition_variable.cpp
context.cpp
fiber.cpp
waker.cpp
future.cpp
mutex.cpp
properties.cpp
Expand Down
286 changes: 23 additions & 263 deletions include/boost/fiber/buffered_channel.hpp

Large diffs are not rendered by default.

29 changes: 5 additions & 24 deletions include/boost/fiber/condition_variable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -44,10 +45,8 @@ enum class cv_status {

class BOOST_FIBERS_DECL condition_variable_any {
private:
using wait_queue_t = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_t wait_queue_{};
wait_queue wait_queue_{};

public:
condition_variable_any() = default;
Expand All @@ -69,13 +68,9 @@ class BOOST_FIBERS_DECL condition_variable_any {
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
active_ctx->suspend( lk);
wait_queue_.suspend_and_wait( lk, active_ctx);

// relock external again before returning
try {
lt.lock();
Expand All @@ -86,8 +81,6 @@ class BOOST_FIBERS_DECL condition_variable_any {
} catch (...) {
std::terminate();
}
// post-conditions
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}

template< typename LockType, typename Pred >
Expand All @@ -105,20 +98,10 @@ class BOOST_FIBERS_DECL condition_variable_any {
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
if ( ! wait_queue_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
status = cv_status::timeout;
// relock local lk
lk.lock();
// remove from waiting-queue
wait_queue_.remove( * active_ctx);
// unlock local lk
lk.unlock();
}
// relock external again before returning
try {
Expand All @@ -130,8 +113,6 @@ class BOOST_FIBERS_DECL condition_variable_any {
} catch (...) {
std::terminate();
}
// post-conditions
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
return status;
}

Expand Down
84 changes: 14 additions & 70 deletions include/boost/fiber/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <boost/fiber/properties.hpp>
#include <boost/fiber/segmented_stack.hpp>
#include <boost/fiber/type.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -63,31 +64,6 @@ class scheduler;

namespace detail {

struct wait_tag;
typedef intrusive::list_member_hook<
intrusive::tag< wait_tag >,
intrusive::link_mode<
intrusive::auto_unlink
>
> wait_hook;
// declaration of the functor that converts between
// the context class and the wait-hook
struct wait_functor {
// required types
using hook_type = wait_hook;
using hook_ptr = hook_type *;
using const_hook_ptr = const hook_type *;
using value_type = context;
using pointer = value_type *;
using const_pointer = const value_type *;

// required static functions
static hook_ptr to_hook_ptr( value_type &value);
static const_hook_ptr to_hook_ptr( value_type const& value);
static pointer to_value_ptr( hook_ptr n);
static const_pointer to_value_ptr( const_hook_ptr n);
};

struct ready_tag;
typedef intrusive::list_member_hook<
intrusive::tag< ready_tag >,
Expand Down Expand Up @@ -131,13 +107,6 @@ typedef intrusive::slist_member_hook<
}

class BOOST_FIBERS_DECL context {
public:
typedef intrusive::list<
context,
intrusive::function_hook< detail::wait_functor >,
intrusive::constant_time_size< false >
> wait_queue_t;

private:
friend class dispatcher_context;
friend class main_context;
Expand Down Expand Up @@ -174,16 +143,16 @@ class BOOST_FIBERS_DECL context {
#endif
detail::spinlock splk_{};
bool terminated_{ false };
wait_queue_t wait_queue_{};
wait_queue wait_queue_{};
public:
detail::wait_hook wait_hook_{};
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
std::atomic< std::intptr_t > twstatus{ 0 };
std::atomic<size_t> waker_epoch_{ 0 };
#endif
private:
scheduler * scheduler_{ nullptr };
fss_data_t fss_data_{};
detail::sleep_hook sleep_hook_{};
waker sleep_waker_{};
detail::ready_hook ready_hook_{};
detail::terminated_hook terminated_hook_{};
detail::worker_hook worker_hook_{};
Expand Down Expand Up @@ -305,7 +274,15 @@ class BOOST_FIBERS_DECL context {

bool wait_until( std::chrono::steady_clock::time_point const&) noexcept;
bool wait_until( std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
detail::spinlock_lock &,
waker &&) noexcept;

bool wake(const size_t) noexcept;

waker create_waker() noexcept {
// this operation makes all previously created wakers to be outdated
return { this, ++waker_epoch_ };
}

void schedule( context *) noexcept;

Expand Down Expand Up @@ -341,8 +318,6 @@ class BOOST_FIBERS_DECL context {

bool terminated_is_linked() const noexcept;

bool wait_is_linked() const noexcept;

template< typename List >
void worker_link( List & lst) noexcept {
static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue");
Expand Down Expand Up @@ -378,21 +353,12 @@ class BOOST_FIBERS_DECL context {
lst.push_back( * this);
}

template< typename List >
void wait_link( List & lst) noexcept {
static_assert( std::is_same< typename List::value_traits::hook_type, detail::wait_hook >::value, "not a wait-queue");
BOOST_ASSERT( ! wait_is_linked() );
lst.push_back( * this);
}

void worker_unlink() noexcept;

void ready_unlink() noexcept;

void sleep_unlink() noexcept;

void wait_unlink() noexcept;

void detach() noexcept;

void attach( context *) noexcept;
Expand Down Expand Up @@ -524,29 +490,7 @@ static intrusive_ptr< context > make_worker_context( launch policy,
std::forward< Arg >( arg) ... } };
}

namespace detail {

inline
wait_functor::hook_ptr wait_functor::to_hook_ptr( wait_functor::value_type & value) {
return & value.wait_hook_;
}

inline
wait_functor::const_hook_ptr wait_functor::to_hook_ptr( wait_functor::value_type const& value) {
return & value.wait_hook_;
}

inline
wait_functor::pointer wait_functor::to_value_ptr( wait_functor::hook_ptr n) {
return intrusive::get_parent_from_member< context >( n, & context::wait_hook_);
}

inline
wait_functor::const_pointer wait_functor::to_value_ptr( wait_functor::const_hook_ptr n) {
return intrusive::get_parent_from_member< context >( n, & context::wait_hook_);
}

}}}
}}

#ifdef _MSC_VER
# pragma warning(pop)
Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -33,10 +34,8 @@ class BOOST_FIBERS_DECL mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };

public:
Expand Down
2 changes: 0 additions & 2 deletions include/boost/fiber/operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ template< typename Clock, typename Duration >
void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) {
std::chrono::steady_clock::time_point sleep_time = boost::fibers::detail::convert( sleep_time_);
fibers::context * active_ctx = fibers::context::active();
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
active_ctx->wait_until( sleep_time);
}

template< typename Rep, typename Period >
void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) {
fibers::context * active_ctx = fibers::context::active();
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
active_ctx->wait_until( std::chrono::steady_clock::now() + timeout_duration);
}

Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/recursive_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -37,10 +38,8 @@ class BOOST_FIBERS_DECL recursive_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };

Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/recursive_timed_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -39,10 +40,8 @@ class BOOST_FIBERS_DECL recursive_timed_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };

Expand Down
4 changes: 3 additions & 1 deletion include/boost/fiber/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ class BOOST_FIBERS_DECL scheduler {

bool wait_until( context *,
std::chrono::steady_clock::time_point const&) noexcept;

bool wait_until( context *,
std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
detail::spinlock_lock &,
waker &&) noexcept;

void suspend() noexcept;
void suspend( detail::spinlock_lock &) noexcept;
Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/timed_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -35,10 +36,8 @@ class BOOST_FIBERS_DECL timed_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };

bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) noexcept;
Expand Down
Loading