Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions include/exec/__detail/__basic_sequence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,35 +71,36 @@ namespace exec {
return {};
}

template <stdexec::__decays_to<__seqexpr> _Self, class _Env>
STDEXEC_MEMFN_DECL(auto get_item_types)(this _Self&& __self, _Env&& __env)
-> decltype(__self.__tag()
.get_item_types(static_cast<_Self&&>(__self), static_cast<_Env&&>(__env))) {
template <stdexec::__decays_to<__seqexpr> _Self, class... _Env>
static auto get_item_types(_Self&& __self, _Env&&... __env)
-> decltype(__self.__tag().get_item_types(
static_cast<_Self&&>(__self),
static_cast<_Env&&>(__env)...)) {
return {};
}

template <stdexec::__decays_to<__seqexpr> _Self, stdexec::receiver _Receiver>
STDEXEC_MEMFN_DECL(auto subscribe)(this _Self&& __self, _Receiver&& __rcvr) noexcept(noexcept(
static auto subscribe(_Self&& __self, _Receiver&& __rcvr) noexcept(noexcept(
__self.__tag().subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr))))
-> decltype(__self.__tag()
.subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr))) {
return __tag_t::subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr));
}

template <class _Sender, class _ApplyFn>
template <class _Sequence, class _ApplyFn>
static auto
apply(_Sender&& __sndr, _ApplyFn&& __fun) noexcept(stdexec::__nothrow_callable<
stdexec::__detail::__impl_of<_Sender>,
stdexec::__copy_cvref_fn<_Sender>,
apply(_Sequence&& __sequence, _ApplyFn&& __fun) noexcept(stdexec::__nothrow_callable<
stdexec::__detail::__impl_of<_Sequence>,
stdexec::__copy_cvref_fn<_Sequence>,
_ApplyFn
>)
-> stdexec::__call_result_t<
stdexec::__detail::__impl_of<_Sender>,
stdexec::__copy_cvref_fn<_Sender>,
stdexec::__detail::__impl_of<_Sequence>,
stdexec::__copy_cvref_fn<_Sequence>,
_ApplyFn
> {
return static_cast<_Sender&&>(__sndr)
.__impl_(stdexec::__copy_cvref_fn<_Sender>(), static_cast<_ApplyFn&&>(__fun));
return static_cast<_Sequence&&>(__sequence)
.__impl_(stdexec::__copy_cvref_fn<_Sequence>(), static_cast<_ApplyFn&&>(__fun));
}
};

Expand Down
11 changes: 2 additions & 9 deletions include/exec/sequence/ignore_all_values.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,6 @@ namespace exec {
using __completion_sigs = __sequence_completion_signatures_of_t<_Child, _Env>;

template <class _Child>
requires receiver_of<_Receiver, __completion_sigs<_Child>>
&& sequence_sender_to<_Child, __receiver_t<_Child>>
auto operator()(__ignore, __ignore, _Child&& __child)
noexcept(__nothrow_constructible_from<__operation_t<_Child>, _Child, _Receiver>)
-> __operation_t<_Child> {
Expand All @@ -287,7 +285,7 @@ namespace exec {

struct ignore_all_values_t {
template <sender _Sender>
auto operator()(_Sender&& __sndr) const {
auto operator()(_Sender&& __sndr) const -> __well_formed_sender auto {
auto __domain = __get_early_domain(static_cast<_Sender&&>(__sndr));
return transform_sender(
__domain, __make_sexpr<ignore_all_values_t>(__(), static_cast<_Sender&&>(__sndr)));
Expand Down Expand Up @@ -320,11 +318,6 @@ namespace exec {
[]<class _Sender, receiver _Receiver>(_Sender&& __sndr, _Receiver __rcvr) noexcept(
__nothrow_callable<__sexpr_apply_t, _Sender, __connect_fn<_Receiver>>)
-> __call_result_t<__sexpr_apply_t, _Sender, __connect_fn<_Receiver>>
requires receiver_of<_Receiver, __completion_sigs<__child_of<_Sender>, env_of_t<_Receiver>>>
&& sequence_sender_to<
__child_of<_Sender>,
__receiver_t<__child_of<_Sender>, _Receiver>
>
{
static_assert(sender_expr_for<_Sender, ignore_all_values_t>);
return __sexpr_apply(static_cast<_Sender&&>(__sndr), __connect_fn<_Receiver>{__rcvr});
Expand All @@ -340,4 +333,4 @@ namespace stdexec {
template <>
struct __sexpr_impl<exec::ignore_all_values_t>
: exec::__ignore_all_values::__ignore_all_values_impl { };
} // namespace stdexec
} // namespace stdexec
4 changes: 2 additions & 2 deletions include/exec/sequence/iterate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ namespace exec {
struct iterate_t {
template <std::ranges::forward_range _Range>
requires __decay_copyable<_Range>
auto operator()(_Range&& __range) const {
auto operator()(_Range&& __range) const -> __well_formed_sequence_sender auto {
return make_sequence_expr<iterate_t>(__decay_t<_Range>{static_cast<_Range&&>(__range)});
}

Expand Down Expand Up @@ -219,4 +219,4 @@ namespace exec {
inline constexpr iterate_t iterate;
} // namespace exec

#endif // STDEXEC_HAS_STD_RANGES()
#endif // STDEXEC_HAS_STD_RANGES()
219 changes: 219 additions & 0 deletions include/exec/sequence/merge.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright (c) 2023 Maikel Nadolski
* Copyright (c) 2023 NVIDIA Corporation
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "../../stdexec/concepts.hpp"
#include "../../stdexec/execution.hpp"
#include "../sequence_senders.hpp"

#include "../__detail/__basic_sequence.hpp"
#include "./transform_each.hpp"
#include "./ignore_all_values.hpp"
#include "stdexec/__detail/__execution_fwd.hpp"
#include "stdexec/__detail/__meta.hpp"
#include "stdexec/__detail/__senders_core.hpp"
#include "stdexec/__detail/__transform_completion_signatures.hpp"

namespace exec {
namespace __merge {
using namespace stdexec;

template <class _Receiver>
struct __operation_base {
_Receiver __receiver_;
};

template <class _ReceiverId>
struct __result_receiver {
using _Receiver = stdexec::__t<_ReceiverId>;

struct __t {
using receiver_concept = stdexec::receiver_t;
using __id = __result_receiver;

__operation_base<_Receiver>* __op_;

void set_value() noexcept {
stdexec::set_value(static_cast<_Receiver&&>(__op_->__receiver_));
}

template <class _Error>
void set_error(_Error&& __error) noexcept {
stdexec::set_error(
static_cast<_Receiver&&>(__op_->__receiver_), static_cast<_Error&&>(__error));
}

void set_stopped() noexcept
{
stdexec::set_stopped(static_cast<_Receiver&&>(__op_->__receiver_));
}

auto get_env() const noexcept -> env_of_t<_Receiver> {
return stdexec::get_env(__op_->__receiver_);
}
};
};

template <class _ReceiverId>
struct __merge_each_fn {
using _Receiver = stdexec::__t<_ReceiverId>;

template <sender _Item>
auto operator()(_Item&& __item, __operation_base<_Receiver>* __op) const noexcept(
__nothrow_callable<set_next_t, _Receiver&, _Item>)
-> next_sender_of_t<_Receiver, _Item> {
return exec::set_next(
__op->__receiver_, static_cast<_Item&&>(__item));
}
};

struct __combine {
template<class _ReceiverId>
using merge_each_fn_t = __binder_back<__merge_each_fn<_ReceiverId>, __operation_base<__t<_ReceiverId>>*>;

template<class _Sequence, class _ReceiverId>
using transform_sender_t = __call_result_t<exec::transform_each_t, _Sequence, merge_each_fn_t<_ReceiverId>>;
template<class _Sequence, class _ReceiverId>
using ignored_sender_t = __call_result_t<exec::ignore_all_values_t, transform_sender_t<_Sequence, _ReceiverId>>;

template<class _ReceiverId, class... _Sequences>
using result_sender_t = __call_result_t<when_all_t,
ignored_sender_t<_Sequences, _ReceiverId>...>;
};

template <class _ReceiverId, class... _Sequences>
struct __operation {
using _Receiver = stdexec::__t<_ReceiverId>;

using merge_each_fn_t = typename __combine::merge_each_fn_t<_ReceiverId>;

template<class _ReceiverIdDependent>
using result_sender_t = typename __combine::result_sender_t<_ReceiverIdDependent, _Sequences...>;

struct __t : __operation_base<_Receiver> {
using __id = __operation;

connect_result_t<result_sender_t<_ReceiverId>, stdexec::__t<__result_receiver<_ReceiverId>>> __op_result_;

__t(_Receiver __rcvr, _Sequences... __sequences)
: __operation_base<
_Receiver
>{static_cast<_Receiver&&>(__rcvr)}
, __op_result_{stdexec::connect(
stdexec::when_all(
exec::ignore_all_values(
exec::transform_each(static_cast<_Sequences&&>(__sequences), merge_each_fn_t{{this}, {}, {}}))...),
stdexec::__t<__result_receiver<_ReceiverId>>{this})} {
}

void start() & noexcept {
stdexec::start(__op_result_);
}
};
};

template <class _Receiver>
struct __subscribe_fn {
_Receiver& __rcvr_;

template <class... _Sequences>
auto operator()(__ignore, _Sequences... __sequences) noexcept(
(__nothrow_decay_copyable<_Sequences> && ...)
&& __nothrow_move_constructible<_Receiver>)
-> __t<__operation<__id<_Receiver>, _Sequences...>> {
return {
static_cast<_Receiver&&>(__rcvr_),
static_cast<_Sequences&&>(__sequences)...};
}
};

struct merge_t {
template <class... _Sequences>
auto operator()(_Sequences&&... __sequences) const
noexcept((__nothrow_decay_copyable<_Sequences> && ...))
-> __well_formed_sequence_sender auto {
auto __domain = __common_domain_t<_Sequences...>();
return transform_sender(
__domain, make_sequence_expr<merge_t>(
static_cast<_Sequences&&>(__sequences)...));
}

template <class... _Args>
using __all_nothrow_decay_copyable = __mbool<(__nothrow_decay_copyable<_Args> && ...)>;

template <class _Error>
using __set_error_t = completion_signatures<set_error_t(__decay_t<_Error>)>;

struct _INVALID_ARGUMENTS_TO_MERGE_ { };

template <class _Self, class _Env>
using __error_t = __mexception<
_INVALID_ARGUMENTS_TO_MERGE_,
__children_of<_Self, __q<_WITH_SEQUENCES_>>,
_WITH_ENVIRONMENT_<_Env>
>;

template <class... _Env>
struct __completions_t {

template <class... _Sequences>
using __f = __meval<
__concat_completion_signatures,
completion_signatures<set_stopped_t()>,
__sequence_completion_signatures_of_t<_Sequences, _Env...>...
>;
};

template <class _Self, class... _Env>
using __completions = __children_of<_Self, __completions_t<_Env...>>;

template <sender_expr_for<merge_t> _Self, class... _Env>
static auto get_completion_signatures(_Self&&, _Env&&...) noexcept {
return __minvoke<__mtry_catch<__q<__completions>, __q<__error_t>>, _Self, _Env...>();
}

template <class... _Env>
struct __items_t {

template <class... _Sequences>
using __f = stdexec::__mapply<
stdexec::__munique<stdexec::__q<exec::item_types>>,
stdexec::__minvoke<
stdexec::__mconcat<stdexec::__qq<exec::item_types>>,
__item_types_of_t<_Sequences, _Env...>...>>;
};

template <class _Self, class... _Env>
using __items = __children_of<_Self, __items_t<_Env...>>;

template <sender_expr_for<merge_t> _Self, class... _Env>
static auto get_item_types(_Self&&, _Env&&...) noexcept {
return __minvoke<__mtry_catch<__q<__items>, __q<__error_t>>, _Self, _Env...>();
}

template <sender_expr_for<merge_t> _Self, receiver _Receiver>
static auto subscribe(_Self&& __self, _Receiver __rcvr)
noexcept(__nothrow_callable<__sexpr_apply_t, _Self, __subscribe_fn<_Receiver>>)
-> __sexpr_apply_result_t<_Self, __subscribe_fn<_Receiver>> {
return __sexpr_apply(static_cast<_Self&&>(__self), __subscribe_fn<_Receiver>{__rcvr});
}
};
} // namespace __merge

using __merge::merge_t;
inline constexpr merge_t merge{};
} // namespace exec
Loading
Loading