Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Requiring executor tasks are noexcept. #528

Merged
merged 17 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/stlab.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
push:
branches:
- main
- main

jobs:
generate-matrix:
Expand Down Expand Up @@ -67,7 +67,7 @@ jobs:
./emsdk install latest
./emsdk activate latest
echo 'source "$HOME/emsdk/emsdk_env.sh"' >> $HOME/.bash_profile

# Override Emsdk's bundled node (14.18.2) to the GH Actions system installation (>= 16.16.0)
sed -i "/^NODE_JS = .*/c\NODE_JS = '`which node`'" .emscripten
echo "Overwrote .emscripten config file to:"
Expand Down Expand Up @@ -122,7 +122,7 @@ jobs:
run: |
call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\vcvarsall.bat" x64
mkdir ..\build
cmake -S. -B../build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_STANDARD=23 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake
cmake -S. -B../build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_STANDARD=20 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake

- name: Build // Unix
if: ${{ startsWith(matrix.config.os, 'ubuntu') || startsWith(matrix.config.os, 'macos') }}
Expand Down
128 changes: 63 additions & 65 deletions stlab/concurrency/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ auto set_process_error(P& process, std::exception_ptr&& error)
}

template <typename P>
auto set_process_error(P&, std::exception_ptr &&)
auto set_process_error(P&, std::exception_ptr&&)
-> std::enable_if_t<!has_set_process_error_v<unwrap_reference_t<P>>, void> {}

/**************************************************************************************************/
Expand Down Expand Up @@ -597,8 +597,8 @@ template <typename Q, typename T, typename R, typename Arg, std::size_t I, typen
struct shared_process_sender_indexed : public shared_process_sender<Arg> {
shared_process<Q, T, R, Args...>& _shared_process;

explicit
shared_process_sender_indexed(shared_process<Q, T, R, Args...>& sp) : _shared_process(sp) {}
explicit shared_process_sender_indexed(shared_process<Q, T, R, Args...>& sp) :
_shared_process(sp) {}

void add_sender() override { ++_shared_process._sender_count; }

Expand Down Expand Up @@ -652,9 +652,7 @@ struct shared_process_sender_helper;
template <typename Q, typename T, typename R, std::size_t... I, typename... Args>
struct shared_process_sender_helper<Q, T, R, std::index_sequence<I...>, Args...>
: shared_process_sender_indexed<Q, T, R, Args, I, Args...>... {

explicit
shared_process_sender_helper(shared_process<Q, T, R, Args...>& sp) :
explicit shared_process_sender_helper(shared_process<Q, T, R, Args...>& sp) :
shared_process_sender_indexed<Q, T, R, Args, I, Args...>(sp)... {}
};

Expand Down Expand Up @@ -768,8 +766,6 @@ struct shared_process

const std::tuple<std::shared_ptr<shared_process_receiver<Args>>...> _upstream;



template <typename E, typename F>
shared_process(E&& e, F&& f) :
shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>(
Expand Down Expand Up @@ -954,12 +950,12 @@ struct shared_process
std::chrono::nanoseconds::min())
broadcast(unwrap(*_process).yield());
else
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
}

/*
Expand All @@ -978,25 +974,25 @@ struct shared_process
} else {
/* Schedule a timeout. */
_timeout_function_active = true;
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
}
return;
}
return;
}
});
});
}
} catch (...) { // this catches exceptions during _process.await() and _process.yield()
broadcast(std::move(std::current_exception()));
Expand Down Expand Up @@ -1038,12 +1034,12 @@ struct shared_process
std::chrono::nanoseconds::min())
broadcast(unwrap(*_process).yield());
else
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
}

/*
Expand All @@ -1062,25 +1058,25 @@ struct shared_process
} else {
/* Schedule a timeout. */
_timeout_function_active = true;
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
}
return;
}
return;
}
});
});
}
} catch (...) { // this catches exceptions during _process.await() and _process.yield()
broadcast(std::move(std::current_exception()));
Expand All @@ -1098,10 +1094,12 @@ struct shared_process
REVISIT (sparent) : See above comments on step() and ensure consistency.

What is this code doing, if we don't have a yield then it also assumes no await?

This seems to be doing a lot for a (required) noexcept operation - are we sure?
*/

template <typename U>
auto step() -> std::enable_if_t<!has_process_yield_v<unwrap_reference_t<U>>> {
auto step() noexcept -> std::enable_if_t<!has_process_yield_v<unwrap_reference_t<U>>> {
using queue_t = typename Q::value_type;
stlab::optional<queue_t> message;
std::array<bool, sizeof...(Args)> do_cts;
Expand Down Expand Up @@ -1135,7 +1133,7 @@ struct shared_process
}

void run() {
_executor([_p = make_weak_ptr(this->shared_from_this())] {
_executor([_p = make_weak_ptr(this->shared_from_this())]() noexcept {
auto p = _p.lock();
if (p) p->template step<T>();
});
Expand Down Expand Up @@ -1352,8 +1350,7 @@ auto zip(S s, R... r) {

/**************************************************************************************************/

struct buffer_size
{
struct buffer_size {
std::size_t _value;
buffer_size(std::size_t b) : _value(b) {}
};
Expand All @@ -1378,15 +1375,16 @@ struct annotated_process {
F _f;
annotations _annotations;

explicit annotated_process(executor_task_pair<F>&& etp) : _f(std::move(etp._f)), _annotations(std::move(etp._executor)) {}
explicit annotated_process(executor_task_pair<F>&& etp) :
_f(std::move(etp._f)), _annotations(std::move(etp._executor)) {}

annotated_process(F f, const executor& e) : _f(std::move(f)), _annotations(e._executor) {}
annotated_process(F f, buffer_size bs) : _f(std::move(f)), _annotations(bs._value) {}

annotated_process(F f, executor&& e) : _f(std::move(f)), _annotations(std::move(e._executor)) {}
annotated_process(F f, annotations&& a) : _f(std::move(f)), _annotations(std::move(a)) {}
annotated_process(executor_task_pair<F>&& etp, buffer_size bs) : _f(std::move(etp._f)), _annotations(std::move(etp._executor), bs) {}

annotated_process(executor_task_pair<F>&& etp, buffer_size bs) :
_f(std::move(etp._f)), _annotations(std::move(etp._executor), bs) {}
};

template <typename B, typename E>
Expand Down Expand Up @@ -1553,8 +1551,8 @@ class STLAB_NODISCARD() receiver {
}

auto operator|(sender<T> send) {
return operator|
([_send = std::move(send)](auto&& x) { _send(std::forward<decltype(x)>(x)); });
return operator|(
[_send = std::move(send)](auto&& x) { _send(std::forward<decltype(x)>(x)); });
}
};

Expand Down
25 changes: 14 additions & 11 deletions stlab/concurrency/default_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
#ifndef STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP
#define STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP

#include <stlab/concurrency/set_current_thread_name.hpp>
#include <stlab/concurrency/task.hpp>
#include <stlab/config.hpp>

#include <stlab/pre_exit.hpp>

#include <stlab/concurrency/set_current_thread_name.hpp>
#include <stlab/concurrency/task.hpp>

#include <cassert>
#include <chrono>
#include <functional>
#include <type_traits>

#if STLAB_TASK_SYSTEM(LIBDISPATCH)
#include <dispatch/dispatch.h>
Expand Down Expand Up @@ -91,7 +94,7 @@ struct executor_type {
using result_type = void;

template <typename F>
void operator()(F f) const {
auto operator()(F f) const -> std::enable_if_t<std::is_nothrow_invocable_v<F>> {
using f_t = decltype(f);

dispatch_group_async_f(detail::group()._group,
Expand Down Expand Up @@ -224,7 +227,7 @@ class waiter {
class notification_queue {
struct element_t {
std::size_t _priority;
task<void()> _task;
task<void() noexcept> _task;

template <class F>
element_t(F&& f, std::size_t priority) : _priority{priority}, _task{std::forward<F>(f)} {}
Expand All @@ -250,15 +253,15 @@ class notification_queue {
}

// Must be called under a lock with a non-empty _q, always returns a valid task
auto pop_not_empty() -> task<void()> {
auto pop_not_empty() -> task<void() noexcept> {
auto result = std::move(_q.front()._task);
std::pop_heap(begin(_q), end(_q), element_t::greater());
_q.pop_back();
return result;
}

public:
auto try_pop() -> task<void()> {
auto try_pop() -> task<void() noexcept> {
lock_t lock{_mutex, std::try_to_lock};
if (!lock || _q.empty()) return nullptr;
return pop_not_empty();
Expand All @@ -275,7 +278,7 @@ class notification_queue {
return true;
}

auto pop() -> std::pair<bool, task<void()>> {
auto pop() -> std::pair<bool, task<void() noexcept>> {
lock_t lock{_mutex};
_waiting = true;
while (_q.empty() && !_done && _waiting)
Expand Down Expand Up @@ -341,7 +344,7 @@ class priority_task_system {
void run(unsigned i) {
stlab::set_current_thread_name("cc.stlab.default_executor");
while (true) {
task<void()> f;
task<void() noexcept> f;

for (unsigned n = 0; n != _count && !f; ++n) {
f = _q[(i + n) % _count].try_pop();
Expand Down Expand Up @@ -408,7 +411,7 @@ class priority_task_system {
stlab::set_current_thread_name("cc.stlab.default_executor.expansion");

while (true) {
task<void()> f;
task<void() noexcept> f;

for (unsigned n = 0; n != _count && !f; ++n) {
f = _q[(i + n) % _count].try_pop();
Expand Down Expand Up @@ -458,7 +461,7 @@ template <executor_priority P = executor_priority::medium>
struct executor_type {
using result_type = void;

void operator()(task<void()> f) const {
void operator()(task<void() noexcept>&& f) const {
static task_system<P> only_task_system{[] {
at_pre_exit([]() noexcept { only_task_system.join(); });
return task_system<P>{};
Expand All @@ -473,7 +476,7 @@ template <executor_priority P = executor_priority::medium>
struct executor_type {
using result_type = void;

void operator()(task<void()> f) const {
void operator()(task<void() noexcept>&& f) const {
pts().execute<static_cast<std::size_t>(P)>(std::move(f));
}
};
Expand Down
14 changes: 7 additions & 7 deletions stlab/concurrency/executor_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace stlab {
inline namespace v1 {
/**************************************************************************************************/

using executor_t = std::function<void(stlab::task<void()>)>;
using executor_t = std::function<void(stlab::task<void() noexcept>)>;

/*
* returns an executor that will schedule any passed task to it to execute
Expand All @@ -35,16 +35,17 @@ template <typename Rep, typename Per = std::ratio<1>>
executor_t execute_at(std::chrono::duration<Rep, Per> duration, executor_t executor) {
return [_duration = std::move(duration), _executor = std::move(executor)](auto f) mutable {
if (_duration != std::chrono::duration<Rep, Per>{})
system_timer(_duration, [_f = std::move(f), _executor = std::move(_executor)]() mutable {
_executor(std::move(_f));
});
system_timer(_duration,
[_f = std::move(f), _executor = std::move(_executor)]() mutable noexcept {
_executor(std::move(_f));
});
else
_executor(std::move(f));
};
}

[[deprecated("Use chrono::duration as parameter instead")]]
inline executor_t execute_at(std::chrono::steady_clock::time_point when, executor_t executor) {
[[deprecated("Use chrono::duration as parameter instead")]] inline executor_t execute_at(
std::chrono::steady_clock::time_point when, executor_t executor) {
using namespace std::chrono;
return execute_at(duration_cast<nanoseconds>(when - steady_clock::now()), std::move(executor));
}
Expand Down Expand Up @@ -79,7 +80,6 @@ executor_task_pair<F> operator&(F&& f, executor e) {
return executor_task_pair<F>{std::move(e._executor), std::forward<F>(f)};
}


/**************************************************************************************************/

} // namespace v1
Expand Down
Loading