Skip to content

Commit

Permalink
Merge pull request #261 from dmitryikh/fix_wait_queue_asserts
Browse files Browse the repository at this point in the history
introduce waker and wait_queue abstractions. fixes #251, #259
  • Loading branch information
olk authored Oct 14, 2020
2 parents e440623 + bf39f57 commit 3194349
Show file tree
Hide file tree
Showing 20 changed files with 319 additions and 794 deletions.
1 change: 1 addition & 0 deletions build/Jamfile.v2
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lib boost_fiber
condition_variable.cpp
context.cpp
fiber.cpp
waker.cpp
future.cpp
mutex.cpp
properties.cpp
Expand Down
286 changes: 23 additions & 263 deletions include/boost/fiber/buffered_channel.hpp

Large diffs are not rendered by default.

29 changes: 5 additions & 24 deletions include/boost/fiber/condition_variable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -44,10 +45,8 @@ enum class cv_status {

class BOOST_FIBERS_DECL condition_variable_any {
private:
using wait_queue_t = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_t wait_queue_{};
wait_queue wait_queue_{};

public:
condition_variable_any() = default;
Expand All @@ -69,13 +68,9 @@ class BOOST_FIBERS_DECL condition_variable_any {
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
active_ctx->suspend( lk);
wait_queue_.suspend_and_wait( lk, active_ctx);

// relock external again before returning
try {
lt.lock();
Expand All @@ -86,8 +81,6 @@ class BOOST_FIBERS_DECL condition_variable_any {
} catch (...) {
std::terminate();
}
// post-conditions
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}

template< typename LockType, typename Pred >
Expand All @@ -105,20 +98,10 @@ class BOOST_FIBERS_DECL condition_variable_any {
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
if ( ! wait_queue_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
status = cv_status::timeout;
// relock local lk
lk.lock();
// remove from waiting-queue
wait_queue_.remove( * active_ctx);
// unlock local lk
lk.unlock();
}
// relock external again before returning
try {
Expand All @@ -130,8 +113,6 @@ class BOOST_FIBERS_DECL condition_variable_any {
} catch (...) {
std::terminate();
}
// post-conditions
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
return status;
}

Expand Down
84 changes: 14 additions & 70 deletions include/boost/fiber/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <boost/fiber/properties.hpp>
#include <boost/fiber/segmented_stack.hpp>
#include <boost/fiber/type.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -63,31 +64,6 @@ class scheduler;

namespace detail {

struct wait_tag;
typedef intrusive::list_member_hook<
intrusive::tag< wait_tag >,
intrusive::link_mode<
intrusive::auto_unlink
>
> wait_hook;
// declaration of the functor that converts between
// the context class and the wait-hook
struct wait_functor {
// required types
using hook_type = wait_hook;
using hook_ptr = hook_type *;
using const_hook_ptr = const hook_type *;
using value_type = context;
using pointer = value_type *;
using const_pointer = const value_type *;

// required static functions
static hook_ptr to_hook_ptr( value_type &value);
static const_hook_ptr to_hook_ptr( value_type const& value);
static pointer to_value_ptr( hook_ptr n);
static const_pointer to_value_ptr( const_hook_ptr n);
};

struct ready_tag;
typedef intrusive::list_member_hook<
intrusive::tag< ready_tag >,
Expand Down Expand Up @@ -131,13 +107,6 @@ typedef intrusive::slist_member_hook<
}

class BOOST_FIBERS_DECL context {
public:
typedef intrusive::list<
context,
intrusive::function_hook< detail::wait_functor >,
intrusive::constant_time_size< false >
> wait_queue_t;

private:
friend class dispatcher_context;
friend class main_context;
Expand Down Expand Up @@ -174,16 +143,16 @@ class BOOST_FIBERS_DECL context {
#endif
detail::spinlock splk_{};
bool terminated_{ false };
wait_queue_t wait_queue_{};
wait_queue wait_queue_{};
public:
detail::wait_hook wait_hook_{};
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
std::atomic< std::intptr_t > twstatus{ 0 };
std::atomic<size_t> waker_epoch_{ 0 };
#endif
private:
scheduler * scheduler_{ nullptr };
fss_data_t fss_data_{};
detail::sleep_hook sleep_hook_{};
waker sleep_waker_{};
detail::ready_hook ready_hook_{};
detail::terminated_hook terminated_hook_{};
detail::worker_hook worker_hook_{};
Expand Down Expand Up @@ -305,7 +274,15 @@ class BOOST_FIBERS_DECL context {

bool wait_until( std::chrono::steady_clock::time_point const&) noexcept;
bool wait_until( std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
detail::spinlock_lock &,
waker &&) noexcept;

bool wake(const size_t) noexcept;

waker create_waker() noexcept {
// this operation makes all previously created wakers to be outdated
return { this, ++waker_epoch_ };
}

void schedule( context *) noexcept;

Expand Down Expand Up @@ -341,8 +318,6 @@ class BOOST_FIBERS_DECL context {

bool terminated_is_linked() const noexcept;

bool wait_is_linked() const noexcept;

template< typename List >
void worker_link( List & lst) noexcept {
static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue");
Expand Down Expand Up @@ -378,21 +353,12 @@ class BOOST_FIBERS_DECL context {
lst.push_back( * this);
}

template< typename List >
void wait_link( List & lst) noexcept {
static_assert( std::is_same< typename List::value_traits::hook_type, detail::wait_hook >::value, "not a wait-queue");
BOOST_ASSERT( ! wait_is_linked() );
lst.push_back( * this);
}

void worker_unlink() noexcept;

void ready_unlink() noexcept;

void sleep_unlink() noexcept;

void wait_unlink() noexcept;

void detach() noexcept;

void attach( context *) noexcept;
Expand Down Expand Up @@ -524,29 +490,7 @@ static intrusive_ptr< context > make_worker_context( launch policy,
std::forward< Arg >( arg) ... } };
}

namespace detail {

inline
wait_functor::hook_ptr wait_functor::to_hook_ptr( wait_functor::value_type & value) {
return & value.wait_hook_;
}

inline
wait_functor::const_hook_ptr wait_functor::to_hook_ptr( wait_functor::value_type const& value) {
return & value.wait_hook_;
}

inline
wait_functor::pointer wait_functor::to_value_ptr( wait_functor::hook_ptr n) {
return intrusive::get_parent_from_member< context >( n, & context::wait_hook_);
}

inline
wait_functor::const_pointer wait_functor::to_value_ptr( wait_functor::const_hook_ptr n) {
return intrusive::get_parent_from_member< context >( n, & context::wait_hook_);
}

}}}
}}

#ifdef _MSC_VER
# pragma warning(pop)
Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -33,10 +34,8 @@ class BOOST_FIBERS_DECL mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };

public:
Expand Down
2 changes: 0 additions & 2 deletions include/boost/fiber/operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ template< typename Clock, typename Duration >
void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) {
std::chrono::steady_clock::time_point sleep_time = boost::fibers::detail::convert( sleep_time_);
fibers::context * active_ctx = fibers::context::active();
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
active_ctx->wait_until( sleep_time);
}

template< typename Rep, typename Period >
void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) {
fibers::context * active_ctx = fibers::context::active();
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
active_ctx->wait_until( std::chrono::steady_clock::now() + timeout_duration);
}

Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/recursive_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -37,10 +38,8 @@ class BOOST_FIBERS_DECL recursive_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };

Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/recursive_timed_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -39,10 +40,8 @@ class BOOST_FIBERS_DECL recursive_timed_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };

Expand Down
4 changes: 3 additions & 1 deletion include/boost/fiber/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@ class BOOST_FIBERS_DECL scheduler {

bool wait_until( context *,
std::chrono::steady_clock::time_point const&) noexcept;

bool wait_until( context *,
std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
detail::spinlock_lock &,
waker &&) noexcept;

void suspend() noexcept;
void suspend( detail::spinlock_lock &) noexcept;
Expand Down
5 changes: 2 additions & 3 deletions include/boost/fiber/timed_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/detail/spinlock.hpp>
#include <boost/fiber/waker.hpp>

#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
Expand All @@ -35,10 +36,8 @@ class BOOST_FIBERS_DECL timed_mutex {
private:
friend class condition_variable;

using wait_queue_type = context::wait_queue_t;

detail::spinlock wait_queue_splk_{};
wait_queue_type wait_queue_{};
wait_queue wait_queue_{};
context * owner_{ nullptr };

bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) noexcept;
Expand Down
Loading

0 comments on commit 3194349

Please sign in to comment.