diff --git a/include/exec/__detail/__basic_sequence.hpp b/include/exec/__detail/__basic_sequence.hpp index ff91f1096..0b2bdf857 100644 --- a/include/exec/__detail/__basic_sequence.hpp +++ b/include/exec/__detail/__basic_sequence.hpp @@ -71,35 +71,36 @@ namespace exec { return {}; } - template _Self, class _Env> - STDEXEC_MEMFN_DECL(auto get_item_types)(this _Self&& __self, _Env&& __env) - -> decltype(__self.__tag() - .get_item_types(static_cast<_Self&&>(__self), static_cast<_Env&&>(__env))) { + template _Self, class... _Env> + static auto get_item_types(_Self&& __self, _Env&&... __env) + -> decltype(__self.__tag().get_item_types( + static_cast<_Self&&>(__self), + static_cast<_Env&&>(__env)...)) { return {}; } template _Self, stdexec::receiver _Receiver> - STDEXEC_MEMFN_DECL(auto subscribe)(this _Self&& __self, _Receiver&& __rcvr) noexcept(noexcept( + static auto subscribe(_Self&& __self, _Receiver&& __rcvr) noexcept(noexcept( __self.__tag().subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr)))) -> decltype(__self.__tag() .subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr))) { return __tag_t::subscribe(static_cast<_Self&&>(__self), static_cast<_Receiver&&>(__rcvr)); } - template + template static auto - apply(_Sender&& __sndr, _ApplyFn&& __fun) noexcept(stdexec::__nothrow_callable< - stdexec::__detail::__impl_of<_Sender>, - stdexec::__copy_cvref_fn<_Sender>, + apply(_Sequence&& __sequence, _ApplyFn&& __fun) noexcept(stdexec::__nothrow_callable< + stdexec::__detail::__impl_of<_Sequence>, + stdexec::__copy_cvref_fn<_Sequence>, _ApplyFn >) -> stdexec::__call_result_t< - stdexec::__detail::__impl_of<_Sender>, - stdexec::__copy_cvref_fn<_Sender>, + stdexec::__detail::__impl_of<_Sequence>, + stdexec::__copy_cvref_fn<_Sequence>, _ApplyFn > { - return static_cast<_Sender&&>(__sndr) - .__impl_(stdexec::__copy_cvref_fn<_Sender>(), static_cast<_ApplyFn&&>(__fun)); + return static_cast<_Sequence&&>(__sequence) + .__impl_(stdexec::__copy_cvref_fn<_Sequence>(), static_cast<_ApplyFn&&>(__fun)); } }; diff --git a/include/exec/sequence/ignore_all_values.hpp b/include/exec/sequence/ignore_all_values.hpp index 3444260ff..7c403b692 100644 --- a/include/exec/sequence/ignore_all_values.hpp +++ b/include/exec/sequence/ignore_all_values.hpp @@ -276,8 +276,6 @@ namespace exec { using __completion_sigs = __sequence_completion_signatures_of_t<_Child, _Env>; template - requires receiver_of<_Receiver, __completion_sigs<_Child>> - && sequence_sender_to<_Child, __receiver_t<_Child>> auto operator()(__ignore, __ignore, _Child&& __child) noexcept(__nothrow_constructible_from<__operation_t<_Child>, _Child, _Receiver>) -> __operation_t<_Child> { @@ -287,7 +285,7 @@ namespace exec { struct ignore_all_values_t { template - auto operator()(_Sender&& __sndr) const { + auto operator()(_Sender&& __sndr) const -> __well_formed_sender auto { auto __domain = __get_early_domain(static_cast<_Sender&&>(__sndr)); return transform_sender( __domain, __make_sexpr(__(), static_cast<_Sender&&>(__sndr))); @@ -320,11 +318,6 @@ namespace exec { [](_Sender&& __sndr, _Receiver __rcvr) noexcept( __nothrow_callable<__sexpr_apply_t, _Sender, __connect_fn<_Receiver>>) -> __call_result_t<__sexpr_apply_t, _Sender, __connect_fn<_Receiver>> - requires receiver_of<_Receiver, __completion_sigs<__child_of<_Sender>, env_of_t<_Receiver>>> - && sequence_sender_to< - __child_of<_Sender>, - __receiver_t<__child_of<_Sender>, _Receiver> - > { static_assert(sender_expr_for<_Sender, ignore_all_values_t>); return __sexpr_apply(static_cast<_Sender&&>(__sndr), __connect_fn<_Receiver>{__rcvr}); @@ -340,4 +333,4 @@ namespace stdexec { template <> struct __sexpr_impl : exec::__ignore_all_values::__ignore_all_values_impl { }; -} // namespace stdexec \ No newline at end of file +} // namespace stdexec diff --git a/include/exec/sequence/iterate.hpp b/include/exec/sequence/iterate.hpp index 58a39315c..1a71dd255 100644 --- a/include/exec/sequence/iterate.hpp +++ b/include/exec/sequence/iterate.hpp @@ -167,7 +167,7 @@ namespace exec { struct iterate_t { template requires __decay_copyable<_Range> - auto operator()(_Range&& __range) const { + auto operator()(_Range&& __range) const -> __well_formed_sequence_sender auto { return make_sequence_expr(__decay_t<_Range>{static_cast<_Range&&>(__range)}); } @@ -219,4 +219,4 @@ namespace exec { inline constexpr iterate_t iterate; } // namespace exec -#endif // STDEXEC_HAS_STD_RANGES() \ No newline at end of file +#endif // STDEXEC_HAS_STD_RANGES() diff --git a/include/exec/sequence/merge.hpp b/include/exec/sequence/merge.hpp new file mode 100644 index 000000000..01511e6ef --- /dev/null +++ b/include/exec/sequence/merge.hpp @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../../stdexec/concepts.hpp" +#include "../../stdexec/execution.hpp" +#include "../sequence_senders.hpp" + +#include "../__detail/__basic_sequence.hpp" +#include "./transform_each.hpp" +#include "./ignore_all_values.hpp" +#include "stdexec/__detail/__execution_fwd.hpp" +#include "stdexec/__detail/__meta.hpp" +#include "stdexec/__detail/__senders_core.hpp" +#include "stdexec/__detail/__transform_completion_signatures.hpp" + +namespace exec { + namespace __merge { + using namespace stdexec; + + template + struct __operation_base { + _Receiver __receiver_; + }; + + template + struct __result_receiver { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using receiver_concept = stdexec::receiver_t; + using __id = __result_receiver; + + __operation_base<_Receiver>* __op_; + + void set_value() noexcept { + stdexec::set_value(static_cast<_Receiver&&>(__op_->__receiver_)); + } + + template + void set_error(_Error&& __error) noexcept { + stdexec::set_error( + static_cast<_Receiver&&>(__op_->__receiver_), static_cast<_Error&&>(__error)); + } + + void set_stopped() noexcept + { + stdexec::set_stopped(static_cast<_Receiver&&>(__op_->__receiver_)); + } + + auto get_env() const noexcept -> env_of_t<_Receiver> { + return stdexec::get_env(__op_->__receiver_); + } + }; + }; + + template + struct __merge_each_fn { + using _Receiver = stdexec::__t<_ReceiverId>; + + template + auto operator()(_Item&& __item, __operation_base<_Receiver>* __op) const noexcept( + __nothrow_callable) + -> next_sender_of_t<_Receiver, _Item> { + return exec::set_next( + __op->__receiver_, static_cast<_Item&&>(__item)); + } + }; + + struct __combine { + template + using merge_each_fn_t = __binder_back<__merge_each_fn<_ReceiverId>, __operation_base<__t<_ReceiverId>>*>; + + template + using transform_sender_t = __call_result_t>; + template + using ignored_sender_t = __call_result_t>; + + template + using result_sender_t = __call_result_t...>; + }; + + template + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + using merge_each_fn_t = typename __combine::merge_each_fn_t<_ReceiverId>; + + template + using result_sender_t = typename __combine::result_sender_t<_ReceiverIdDependent, _Sequences...>; + + struct __t : __operation_base<_Receiver> { + using __id = __operation; + + connect_result_t, stdexec::__t<__result_receiver<_ReceiverId>>> __op_result_; + + __t(_Receiver __rcvr, _Sequences... __sequences) + : __operation_base< + _Receiver + >{static_cast<_Receiver&&>(__rcvr)} + , __op_result_{stdexec::connect( + stdexec::when_all( + exec::ignore_all_values( + exec::transform_each(static_cast<_Sequences&&>(__sequences), merge_each_fn_t{{this}, {}, {}}))...), + stdexec::__t<__result_receiver<_ReceiverId>>{this})} { + } + + void start() & noexcept { + stdexec::start(__op_result_); + } + }; + }; + + template + struct __subscribe_fn { + _Receiver& __rcvr_; + + template + auto operator()(__ignore, __ignore, _Sequences... __sequences) noexcept( + (__nothrow_decay_copyable<_Sequences> && ...) + && __nothrow_move_constructible<_Receiver>) + -> __t<__operation<__id<_Receiver>, _Sequences...>> { + return { + static_cast<_Receiver&&>(__rcvr_), + static_cast<_Sequences&&>(__sequences)...}; + } + }; + + struct merge_t { + template + auto operator()(_Sequences&&... __sequences) const + noexcept((__nothrow_decay_copyable<_Sequences> && ...)) + -> __well_formed_sequence_sender auto { + auto __domain = __common_domain_t<_Sequences...>(); + return transform_sender( + __domain, make_sequence_expr(__(), + static_cast<_Sequences&&>(__sequences)...)); + } + + template + using __set_error_t = completion_signatures)>; + + struct _INVALID_ARGUMENTS_TO_MERGE_ { }; + + template + using __error_t = __mexception< + _INVALID_ARGUMENTS_TO_MERGE_, + __children_of<_Self, __q<_WITH_SEQUENCES_>>, + _WITH_ENVIRONMENT_<_Env> + >; + + template + struct __completions_fn_t { + + template + using __f = __meval< + __concat_completion_signatures, + completion_signatures, + __sequence_completion_signatures_of_t<_Sequences, _Env...>... + >; + }; + + template + using __completions_t = __children_of<_Self, __completions_fn_t<_Env...>>; + + template _Self, class... _Env> + static auto get_completion_signatures(_Self&&, _Env&&...) noexcept { + return __minvoke<__mtry_catch<__q<__completions_t>, __q<__error_t>>, _Self, _Env...>(); + } + + template + struct __items_fn_t { + + template + using __f = stdexec::__mapply< + stdexec::__munique>, + stdexec::__minvoke< + stdexec::__mconcat>, + __item_types_of_t<_Sequences, _Env...>...>>; + }; + + template + using __items_t = __children_of<_Self, __items_fn_t<_Env...>>; + + template _Self, class... _Env> + static auto get_item_types(_Self&&, _Env&&...) noexcept { + return __minvoke<__mtry_catch<__q<__items_t>, __q<__error_t>>, _Self, _Env...>(); + } + + template _Self, receiver _Receiver> + static auto subscribe(_Self&& __self, _Receiver __rcvr) + noexcept(__nothrow_callable<__sexpr_apply_t, _Self, __subscribe_fn<_Receiver>>) + -> __sexpr_apply_result_t<_Self, __subscribe_fn<_Receiver>> { + return __sexpr_apply(static_cast<_Self&&>(__self), __subscribe_fn<_Receiver>{__rcvr}); + } + }; + } // namespace __merge + + using __merge::merge_t; + inline constexpr merge_t merge{}; +} // namespace exec diff --git a/include/exec/sequence/transform_each.hpp b/include/exec/sequence/transform_each.hpp index 3e3fe05a4..cf8d0d743 100644 --- a/include/exec/sequence/transform_each.hpp +++ b/include/exec/sequence/transform_each.hpp @@ -21,6 +21,8 @@ #include "../sequence_senders.hpp" #include "../__detail/__basic_sequence.hpp" +#include "stdexec/__detail/__diagnostics.hpp" +#include "stdexec/__detail/__meta.hpp" namespace exec { namespace __transform_each { @@ -42,8 +44,6 @@ namespace exec { __operation_base<_Receiver, _Adaptor>* __op_; template - requires __callable<_Adaptor&, _Item> - && __callable> auto set_next(_Item&& __item) & noexcept( __nothrow_callable> && __nothrow_callable<_Adaptor&, _Item>) @@ -57,7 +57,6 @@ namespace exec { } template - requires __callable void set_error(_Error&& __error) noexcept { stdexec::set_error( static_cast<_Receiver&&>(__op_->__receiver_), static_cast<_Error&&>(__error)); @@ -121,29 +120,37 @@ namespace exec { template struct _WITH_ITEM_SENDER_ { }; - template - auto __try_call(_Item*) -> stdexec::__mexception< - _NOT_CALLABLE_ADAPTOR_<_Adaptor&>, - _WITH_ITEM_SENDER_> - >; + template + struct __try_adaptor_calls_t { + + template + auto __try_adaptor_for_item(_Item*) -> stdexec::__mexception< + _NOT_CALLABLE_ADAPTOR_<_Adaptor&>, + _WITH_ITEM_SENDER_> + >; - template - requires stdexec::__callable<_Adaptor&, _Item> - auto __try_call(_Item*) -> stdexec::__msuccess; + template + requires stdexec::__callable<_Adaptor&, _Item> + auto __try_adaptor_for_item(_Item*) -> stdexec::__msuccess; + + template + auto operator()(item_types<_Items...>*) -> decltype(( + stdexec::__msuccess(), ..., __try_adaptor_for_item(static_cast<_Items*>(nullptr)))); + }; - template - auto __try_calls(item_types<_Items...>*) -> decltype(( - stdexec::__msuccess() && ... && __try_call<_Adaptor>(static_cast<_Items*>(nullptr)))); + template + using __try_adaptor_calls_result_t = __call_result_t<__try_adaptor_calls_t>, _Items>; template - concept __callabale_adaptor_for = requires(_Items* __items) { - { __try_calls>(__items) } -> stdexec::__ok; + concept __callable_adaptor_for = requires(_Items* __items) { + { __try_adaptor_calls_t>{}(__items) } -> stdexec::__ok; }; struct transform_each_t { template auto operator()(_Sequence&& __sndr, _Adaptor&& __adaptor) const - noexcept(__nothrow_decay_copyable<_Sequence> && __nothrow_decay_copyable<_Adaptor>) { + noexcept(__nothrow_decay_copyable<_Sequence> && __nothrow_decay_copyable<_Adaptor>) + -> __well_formed_sequence_sender auto { return make_sequence_expr( static_cast<_Adaptor&&>(__adaptor), static_cast<_Sequence&&>(__sndr)); } @@ -155,12 +162,12 @@ namespace exec { return {{static_cast<_Adaptor&&>(__adaptor)}, {}, {}}; } - template - using __completion_sigs_t = __sequence_completion_signatures_of_t<__child_of<_Self>, _Env>; + template + using __completion_sigs_t = __sequence_completion_signatures_of_t<__child_of<_Self>, _Env...>; - template _Self, class _Env> + template _Self, class... _Env> static auto - get_completion_signatures(_Self&&, _Env&&) noexcept -> __completion_sigs_t<_Self, _Env> { + get_completion_signatures(_Self&&, _Env&&...) noexcept -> __completion_sigs_t<_Self, _Env...> { return {}; } @@ -170,11 +177,61 @@ namespace exec { stdexec::__mbind_front_q<__call_result_t, __data_of<_Self>&>, stdexec::__munique> >, - item_types_of_t<__child_of<_Self>, _Env...> + __item_types_of_t<__child_of<_Self>, _Env...> >; - template _Self, class _Env> - static auto get_item_types(_Self&&, _Env&&) noexcept -> __item_types_t<_Self, _Env> { + template + struct _TRANSFORM_EACH_ADAPTOR_INVOCATION_FAILED_ {}; + + template _Self, class... _Env> + requires (!__mvalid<__item_types_t, _Self, _Env...>) + && __mvalid<__item_types_of_t, __child_of<_Self>, _Env...> + && (!__callable_adaptor_for< + __data_of<_Self>, + __item_types_of_t<__child_of<_Self>, _Env...> + >) + static auto get_item_types(_Self&&, _Env&&...) noexcept -> __mexception< + _TRANSFORM_EACH_ADAPTOR_INVOCATION_FAILED_<_Self>, + _WITH_SEQUENCE_<__child_of<_Self>>, + _WITH_ENVIRONMENT_<_Env...>, + _WITH_TYPE_<__try_adaptor_calls_result_t< + __data_of<_Self>, + __item_types_of_t<__child_of<_Self>, _Env...>>>>; + + template + struct _TRANSFORM_EACH_ITEM_TYPES_OF_THE_CHILD_ARE_INVALID_ {}; + + template _Self, class... _Env> + requires (!__mvalid<__item_types_t, _Self, _Env...>) + && (!__mvalid<__item_types_of_t, __child_of<_Self>, _Env...>) + static auto get_item_types(_Self&&, _Env&&...) noexcept -> __mexception< + _TRANSFORM_EACH_ITEM_TYPES_OF_THE_CHILD_ARE_INVALID_<_Self>, + _WITH_SEQUENCE_<__child_of<_Self>>, + _WITH_ENVIRONMENT_<_Env...>>; + + template + struct _TRANSFORM_EACH_ITEM_TYPES_CALCULATION_FAILED_ {}; + + template _Self, class... _Env> + requires (!__mvalid<__item_types_t, _Self, _Env...>) + && __mvalid<__item_types_of_t, __child_of<_Self>, _Env...> + && __callable_adaptor_for< + __data_of<_Self>, + __item_types_of_t<__child_of<_Self>, _Env...> + > + static auto get_item_types(_Self&&, _Env&&...) noexcept -> __mexception< + _TRANSFORM_EACH_ITEM_TYPES_CALCULATION_FAILED_<_Self>, + _WITH_SEQUENCE_<__child_of<_Self>>, + _WITH_ENVIRONMENT_<_Env...>>; + + template _Self, class... _Env> + requires __mvalid<__item_types_t, _Self, _Env...> + && __mvalid<__item_types_of_t, __child_of<_Self>, _Env...> + && __callable_adaptor_for< + __data_of<_Self>, + __item_types_of_t<__child_of<_Self>, _Env...> + > + static auto get_item_types(_Self&&, _Env&&...) noexcept -> __item_types_t<_Self, _Env...> { return {}; } @@ -185,12 +242,6 @@ namespace exec { using __operation_t = __t<__operation<__child_of<_Self>, __id<_Receiver>, __data_of<_Self>>>; template _Self, receiver _Receiver> - requires __callabale_adaptor_for< - __data_of<_Self>, - item_types_of_t<__child_of<_Self>, env_of_t<_Receiver>> - > - && sequence_receiver_of<_Receiver, __item_types_t<_Self, env_of_t<_Receiver>>> - && sequence_sender_to<__child_of<_Self>, __receiver_t<_Self, _Receiver>> static auto subscribe(_Self&& __self, _Receiver __rcvr) noexcept(__nothrow_callable<__sexpr_apply_t, _Self, __subscribe_fn<_Receiver>>) -> __call_result_t<__sexpr_apply_t, _Self, __subscribe_fn<_Receiver>> { @@ -208,4 +259,4 @@ namespace exec { using __transform_each::transform_each_t; inline constexpr transform_each_t transform_each{}; -} // namespace exec \ No newline at end of file +} // namespace exec diff --git a/include/exec/sequence_senders.hpp b/include/exec/sequence_senders.hpp index 3f12e1305..06fbf9326 100644 --- a/include/exec/sequence_senders.hpp +++ b/include/exec/sequence_senders.hpp @@ -17,6 +17,8 @@ #pragma once #include "../stdexec/execution.hpp" +#include "stdexec/__detail/__concepts.hpp" +#include "stdexec/__detail/__meta.hpp" namespace exec { struct sequence_sender_t : stdexec::sender_t { }; @@ -31,20 +33,22 @@ namespace exec { template using __f = __mand<__mapply<__mcontains<_Needles>, _Haystack>...>; }; - template - using __mall_contained_in = __mapply<__mall_contained_in_impl<_Haystack>, _Needles>; + using __mall_contained_in_t = __mapply<__mall_contained_in_impl<_Haystack>, _Needles>; template - concept __all_contained_in = __v<__mall_contained_in<_Needles, _Haystack>>; + concept __all_contained_in_t = __v<__mall_contained_in_t<_Needles, _Haystack>>; + } // namespace __sequence_sndr // This concept checks if a given sender satisfies the requirements to be returned from `set_next`. - template > - concept next_sender = sender_in<_Sender, _Env> - && __all_contained_in< - completion_signatures_of_t<_Sender, _Env>, - completion_signatures - >; + template > + concept next_sender = stdexec::sender_in<_Sender, _Env> + && __sequence_sndr::__all_contained_in_t< + stdexec::completion_signatures_of_t<_Sender, _Env>, + stdexec::completion_signatures + >; + + namespace __sequence_sndr { template concept __has_set_next_member = requires(_Receiver& __rcvr, _Item&& __item) { @@ -83,10 +87,10 @@ namespace exec { using __sequence_sndr::set_next_t; inline constexpr set_next_t set_next; - template + template using next_sender_of_t = decltype(exec::set_next( stdexec::__declval&>(), - stdexec::__declval<_Sender>())); + stdexec::__declval<_Sequence>())); namespace __sequence_sndr { @@ -96,7 +100,7 @@ namespace exec { using receiver_concept = stdexec::receiver_t; using __id = __stopped_means_break; using _Receiver = stdexec::__t<_ReceiverId>; - using _Token = stop_token_of_t>; + using __token_t = stop_token_of_t>; STDEXEC_ATTRIBUTE(no_unique_address) _Receiver __rcvr_; auto get_env() const noexcept -> env_of_t<_Receiver> { @@ -111,9 +115,9 @@ namespace exec { void set_stopped() noexcept requires __callable - && (unstoppable_token<_Token> || __callable) + && (unstoppable_token<__token_t> || __callable) { - if constexpr (unstoppable_token<_Token>) { + if constexpr (unstoppable_token<__token_t>) { stdexec::set_value(static_cast<_Receiver&&>(__rcvr_)); } else { auto __token = stdexec::get_stop_token(stdexec::get_env(__rcvr_)); @@ -131,80 +135,122 @@ namespace exec { using __stopped_means_break_t = __t<__stopped_means_break<__id<__decay_t<_Rcvr>>>>; } // namespace __sequence_sndr - template + template concept __enable_sequence_sender = requires { - typename _Sender::sender_concept; - } && stdexec::derived_from; + typename _Sequence::sender_concept; + } && stdexec::derived_from; - template - inline constexpr bool enable_sequence_sender = __enable_sequence_sender<_Sender>; + template + inline constexpr bool enable_sequence_sender = __enable_sequence_sender<_Sequence>; template struct item_types { }; + template + concept __has_item_typedef = requires { typename _Tp::item_types; }; + + namespace __debug { + using namespace stdexec::__debug; + + struct __item_types { }; + } // namespace __debug + + namespace __errs { + using namespace stdexec; + inline constexpr __mstring __unrecognized_sequence_type_diagnostic = + "The given type cannot be used as a sequence with the given environment " + "because the attempt to compute the item types failed."_mstr; + } // namespace __errs + + template + struct _WITH_SEQUENCE_; + + template + struct _WITH_SEQUENCES_; + + template + struct _UNRECOGNIZED_SEQUENCE_TYPE_; + ///////////////////////////////////////////////////////////////////////////// - // [execution.sndtraits] + // [execution.seqtraits] namespace __sequence_sndr { struct get_item_types_t; - template - using __tfx_sender = - transform_sender_result_t<__late_domain_of_t<_Sender, _Env>, _Sender, _Env>; - template - concept __with_tag_invoke = tag_invocable, _Env>; + template + using __item_types_of_t = + __call_result_t; - template - using __member_alias_t = typename __decay_t<__tfx_sender<_Sender, _Env>>::item_types; + template + using __unrecognized_sequence_error_t = + __mexception<_UNRECOGNIZED_SEQUENCE_TYPE_<>, _WITH_SEQUENCE_<_Sequence>, _WITH_ENVIRONMENT_<_Env>...>; - template - concept __with_member_alias = __mvalid<__member_alias_t, _Sender, _Env>; + template + using __member_result_t = decltype(__declval<_Sequence>() + .get_item_types(__declval<_Env>())); - template - concept __with_member = requires(__tfx_sender<_Sender, _Env>&& __sndr, _Env&& __env) { - static_cast<__tfx_sender<_Sender, _Env> &&>(__sndr) - .get_item_types(static_cast<_Env &&>(__env)); - }; + template + using __static_member_result_t = decltype(STDEXEC_REMOVE_REFERENCE( + _Sequence)::get_item_types(__declval<_Sequence>(), __declval<_Env>())); + + template + using __tfx_sequence_t = + transform_sender_result_t<__late_domain_of_t<_Sequence, _Env>, _Sequence, _Env>; + + template + concept __with_tag_invoke = tag_invocable, _Env>; + + template + using __member_alias_t = typename __decay_t<__tfx_sequence_t<_Sequence, _Env>>::item_types; + + template + concept __with_member_alias = __mvalid<__member_alias_t, _Sequence, _Env>; + + template + concept __with_static_member = __mvalid<__static_member_result_t, _Sequence, _Env>; + + template + concept __with_member = __mvalid<__member_result_t, _Sequence, _Env...>; struct get_item_types_t { - template + template static auto __impl() { - static_assert(sizeof(_Sender), "Incomplete type used with get_item_types"); + static_assert(sizeof(_Sequence), "Incomplete type used with get_item_types"); static_assert(sizeof(_Env), "Incomplete type used with get_item_types"); - using _TfxSender = __tfx_sender<_Sender, _Env>; - if constexpr (__merror<_TfxSender>) { + using __tfx_sequence_t = __tfx_sequence_t<_Sequence, _Env>; + if constexpr (__merror<__tfx_sequence_t>) { // Computing the type of the transformed sender returned an error type. Propagate it. - return static_cast<_TfxSender (*)()>(nullptr); - } else if constexpr (__with_member_alias<_Sender, _Env>) { - using _Result = __member_alias_t<_Sender, _Env>; - return static_cast<_Result (*)()>(nullptr); - } else if constexpr (__with_member<_Sender, _Env>) { - using _Result = decltype(__declval<_TfxSender>().get_item_types(__declval<_Env>())); - return static_cast<_Result (*)()>(nullptr); - } else if constexpr (__with_tag_invoke<_Sender, _Env>) { - using _Result = tag_invoke_result_t; - return static_cast<_Result (*)()>(nullptr); + return static_cast<__tfx_sequence_t (*)()>(nullptr); + } else if constexpr (__with_member_alias<__tfx_sequence_t, _Env>) { + using __result_t = __member_alias_t<__tfx_sequence_t, _Env>; + return static_cast<__result_t (*)()>(nullptr); + } else if constexpr (__with_static_member<__tfx_sequence_t, _Env>) { + using __result_t = __static_member_result_t<__tfx_sequence_t, _Env>; + return static_cast<__result_t (*)()>(nullptr); + } else if constexpr (__with_member<__tfx_sequence_t, _Env>) { + using __result_t = decltype(__declval<__tfx_sequence_t>().get_item_types(__declval<_Env>())); + return static_cast<__result_t (*)()>(nullptr); + } else if constexpr (__with_tag_invoke<__tfx_sequence_t, _Env>) { + using __result_t = tag_invoke_result_t; + return static_cast<__result_t (*)()>(nullptr); } else if constexpr ( - sender_in<_TfxSender, _Env> && !enable_sequence_sender>) { - using _Result = item_types>; - return static_cast<_Result (*)()>(nullptr); + sender_in<__tfx_sequence_t, _Env> && !enable_sequence_sender>) { + using __result_t = item_types>; + return static_cast<__result_t (*)()>(nullptr); } else if constexpr (__is_debug_env<_Env>) { + using __tag_invoke::tag_invoke; // This ought to cause a hard error that indicates where the problem is. - using _Completions - [[maybe_unused]] = decltype(__declval<_TfxSender>().get_item_types(__declval<_Env>())); - return static_cast<__debug::__completion_signatures (*)()>(nullptr); + using __item_types_t + [[maybe_unused]] = tag_invoke_result_t; + return static_cast<__debug::__item_types (*)()>(nullptr); } else { - using _Result = __mexception< - _UNRECOGNIZED_SENDER_TYPE_<>, - _WITH_SENDER_<_Sender>, - _WITH_ENVIRONMENT_<_Env> - >; - return static_cast<_Result (*)()>(nullptr); + using __result_t = __unrecognized_sequence_error_t<_Sequence, _Env>; + return static_cast<__result_t (*)()>(nullptr); } } - template > + template > constexpr auto - operator()(_Sender&&, _Env&& = {}) const noexcept -> decltype(__impl<_Sender, _Env>()()) { + operator()(_Sequence&&, _Env&& = {}) const noexcept -> decltype(__impl<_Sequence, _Env>()()) { return {}; } }; @@ -213,22 +259,87 @@ namespace exec { using __sequence_sndr::get_item_types_t; inline constexpr get_item_types_t get_item_types{}; - template - using item_types_of_t = - decltype(get_item_types(stdexec::__declval<_Sender>(), stdexec::__declval<_Env>()...)); - - template - concept sequence_sender = stdexec::sender_in<_Sender, _Env...> - && enable_sequence_sender>; + template + concept sequence_sender = stdexec::sender_in<_Sequence, _Env...> + && enable_sequence_sender>; - template - concept has_sequence_item_types = requires(_Sender&& __sndr, _Env&&... __env) { - get_item_types(static_cast<_Sender &&>(__sndr), static_cast<_Env &&>(__env)...); + template + concept has_sequence_item_types = requires(_Sequence&& __sequence, _Env&&... __env) { + { get_item_types(static_cast<_Sequence &&>(__sequence), static_cast<_Env &&>(__env)...) }; }; - template - concept sequence_sender_in = sequence_sender<_Sender, _Env...> - && has_sequence_item_types<_Sender, _Env...>; + template + concept sequence_sender_in = sequence_sender<_Sequence, _Env...> + && has_sequence_item_types<_Sequence, _Env...>; + + template + using __item_types_of_t = + decltype(get_item_types(stdexec::__declval<_Sequence>(), stdexec::__declval<_Env>()...)); + + + template + struct _SEQUENCE_ITEM_IS_NOT_A_WELL_FORMED_SENDER_ { }; + + template + auto __check_item(_Item*) -> stdexec::__mexception< + _SEQUENCE_ITEM_IS_NOT_A_WELL_FORMED_SENDER_<_Item>, + _WITH_SEQUENCE_<_Sequence> + >; + + template + requires stdexec::__well_formed_sender<_Item> + auto __check_item(_Item*) -> stdexec::__msuccess; + + template + requires stdexec::__merror<_Items> + auto __check_items(_Items*) -> _Items; + + template + struct _SEQUENCE_GET_ITEM_TYPES_RESULT_IS_NOT_WELL_FORMED_ { }; + + template + requires (!stdexec::__merror<_Items>) + auto __check_items(_Items*) -> stdexec::__mexception< + _SEQUENCE_GET_ITEM_TYPES_RESULT_IS_NOT_WELL_FORMED_<_Items>, + _WITH_SEQUENCE_<_Sequence> + >; + + template + auto __check_items(exec::item_types<_Items...>*) -> decltype(( + stdexec::__msuccess(), + ..., + exec::__check_item<_Sequence>(static_cast<_Items*>(nullptr)))); + + template + requires stdexec::__merror<_Sequence> + auto __check_sequence(_Sequence*) -> _Sequence; + + struct _SEQUENCE_GET_ITEM_TYPES_IS_NOT_WELL_FORMED_ {}; + + template + requires (!stdexec::__merror<_Sequence>) + && (!stdexec::__mvalid<__item_types_of_t, _Sequence>) + auto __check_sequence(_Sequence*) -> stdexec::__mexception< + _SEQUENCE_GET_ITEM_TYPES_IS_NOT_WELL_FORMED_, + _WITH_SEQUENCE_<_Sequence> + >; + + template + requires (!stdexec::__merror<_Sequence>) + && stdexec::__mvalid<__item_types_of_t, _Sequence> + auto __check_sequence(_Sequence*) -> decltype( + exec::__check_items<_Sequence>(static_cast<__item_types_of_t<_Sequence>*>(nullptr))); + + template + concept __well_formed_item_senders = has_sequence_item_types> + && requires(stdexec::__decay_t<_Sequence>* __sequence) { + { exec::__check_sequence(__sequence) } -> stdexec::__ok; + }; + + template + concept __well_formed_sequence_sender = stdexec::__well_formed_sender<_Sequence> + && enable_sequence_sender> + && __well_formed_item_senders<_Sequence>; template struct _WITH_RECEIVER_ { }; @@ -271,14 +382,14 @@ namespace exec { >; template - using __item_completion_signatures = stdexec::transform_completion_signatures< + using __item_completion_signatures_t = stdexec::transform_completion_signatures< stdexec::__completion_signatures_of_t<_Sender, _Env...>, stdexec::completion_signatures, stdexec::__mconst>::__f >; template - using __sequence_completion_signatures = stdexec::transform_completion_signatures< + using __sequence_completion_signatures_t = stdexec::transform_completion_signatures< stdexec::__completion_signatures_of_t<_Sequence, _Env...>, stdexec::completion_signatures, stdexec::__mconst>::__f @@ -287,37 +398,42 @@ namespace exec { template using __sequence_completion_signatures_of_t = stdexec::__mapply< stdexec::__mtransform< - stdexec::__mbind_back_q<__item_completion_signatures, _Env...>, + stdexec::__mbind_back_q<__item_completion_signatures_t, _Env...>, stdexec::__mbind_back< stdexec::__mtry_q, - __sequence_completion_signatures<_Sequence, _Env...> + __sequence_completion_signatures_t<_Sequence, _Env...> > >, - item_types_of_t<_Sequence, _Env...> + __item_types_of_t<_Sequence, _Env...> >; - template + template concept sequence_receiver_from = stdexec::receiver<_Receiver> - && stdexec::sender_in<_Sender, stdexec::env_of_t<_Receiver>> + && stdexec::sender_in<_Sequence, stdexec::env_of_t<_Receiver>> && sequence_receiver_of< _Receiver, - item_types_of_t<_Sender, stdexec::env_of_t<_Receiver>> + __item_types_of_t<_Sequence, stdexec::env_of_t<_Receiver>> > - && ((sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> + && ((sequence_sender_in<_Sequence, stdexec::env_of_t<_Receiver>> && stdexec::receiver_of< _Receiver, stdexec::completion_signatures_of_t< - _Sender, + _Sequence, stdexec::env_of_t<_Receiver> > >) - || (!sequence_sender_in<_Sender, stdexec::env_of_t<_Receiver>> && stdexec::__receiver_from<__sequence_sndr::__stopped_means_break_t<_Receiver>, next_sender_of_t<_Receiver, _Sender>>) ); + || (!sequence_sender_in<_Sequence, stdexec::env_of_t<_Receiver>> + && stdexec::__receiver_from<__sequence_sndr::__stopped_means_break_t<_Receiver>, next_sender_of_t<_Receiver, _Sequence>>) ); namespace __sequence_sndr { struct subscribe_t; + struct _NO_USABLE_SUBSCRIBE_CUSTOMIZATION_FOUND_ { + void operator()() const noexcept = delete; + }; + template - using __single_sender_completion_sigs = __if_c< + using __next_sender_completion_sigs_t = __if_c< unstoppable_token>, completion_signatures, completion_signatures @@ -330,112 +446,142 @@ namespace exec { && sequence_receiver_of<_Receiver, item_types>> && sender_to, __stopped_means_break_t<_Receiver>>; - template - concept __subscribeable_with_member = receiver<_Receiver> - && sequence_sender_in<_Sender, env_of_t<_Receiver>> - && sequence_receiver_from<_Receiver, _Sender> - && requires(_Sender&& __sndr, _Receiver&& __rcvr) { + template + concept __subscribable_with_static_member = receiver<_Receiver> + && sequence_sender_in<_Sequence, env_of_t<_Receiver>> + && sequence_receiver_from<_Receiver, _Sequence> + && requires(_Sequence&& __sequence, _Receiver&& __rcvr) { + { + STDEXEC_REMOVE_REFERENCE(_Sequence) + ::subscribe(static_cast<_Sequence &&>(__sequence), static_cast<_Receiver &&>(__rcvr)) + }; + }; + + template + concept __subscribable_with_member = receiver<_Receiver> + && sequence_sender_in<_Sequence, env_of_t<_Receiver>> + && sequence_receiver_from<_Receiver, _Sequence> + && requires(_Sequence&& __sequence, _Receiver&& __rcvr) { { - static_cast<_Sender &&>(__sndr) + static_cast<_Sequence &&>(__sequence) .subscribe(static_cast<_Receiver &&>(__rcvr)) }; }; - template - concept __subscribeable_with_tag_invoke = receiver<_Receiver> - && sequence_sender_in<_Sender, env_of_t<_Receiver>> - && sequence_receiver_from<_Receiver, _Sender> - && tag_invocable; + template + concept __subscribable_with_tag_invoke = receiver<_Receiver> + && sequence_sender_in<_Sequence, env_of_t<_Receiver>> + && sequence_receiver_from<_Receiver, _Sequence> + && tag_invocable; struct subscribe_t { - template - using __tfx_sndr = __tfx_sender<_Sender, env_of_t<_Receiver>>; + template + using __tfx_sequence_t = __tfx_sequence_t<_Sequence, env_of_t<_Receiver>>; - template + template static constexpr auto __select_impl() noexcept { - using _Domain = __late_domain_of_t<_Sender, env_of_t<_Receiver&>>; - constexpr bool _NothrowTfxSender = - __nothrow_callable>; - using _TfxSender = __tfx_sndr<_Sender, _Receiver>; - if constexpr (__next_connectable<_TfxSender, _Receiver>) { - using _Result = connect_result_t< - next_sender_of_t<_Receiver, _TfxSender>, + using __domain_t = __late_domain_of_t<_Sequence, env_of_t<_Receiver&>>; + constexpr bool _NothrowTfxSequence = + __nothrow_callable>; + using __tfx_sequence_t = __tfx_sequence_t<_Sequence, _Receiver>; + if constexpr (__next_connectable<__tfx_sequence_t, _Receiver>) { + using __result_t = connect_result_t< + next_sender_of_t<_Receiver, __tfx_sequence_t>, __stopped_means_break_t<_Receiver> >; static_assert( - operation_state<_Result>, + operation_state<__result_t>, "stdexec::connect(sender, receiver) must return a type that " "satisfies the operation_state concept"); constexpr bool _Nothrow = __nothrow_connectable< - next_sender_of_t<_Receiver, _TfxSender>, + next_sender_of_t<_Receiver, __tfx_sequence_t>, __stopped_means_break_t<_Receiver> >; - return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); - } else if constexpr (__subscribeable_with_member<_TfxSender, _Receiver>) { - using _Result = decltype(__declval<_TfxSender>().subscribe(__declval<_Receiver>())); + return static_cast<__result_t (*)() noexcept(_Nothrow)>(nullptr); + } else if constexpr (__subscribable_with_static_member<__tfx_sequence_t, _Receiver>) { + using __result_t = decltype(STDEXEC_REMOVE_REFERENCE(__tfx_sequence_t):: + subscribe(__declval<__tfx_sequence_t>(), __declval<_Receiver>())); + static_assert( + operation_state<__result_t>, + "Sequence::subscribe(sender, receiver) must return a type that " + "satisfies the operation_state concept"); + constexpr bool _Nothrow = _NothrowTfxSequence + && noexcept(STDEXEC_REMOVE_REFERENCE(__tfx_sequence_t) + ::subscribe(__declval<__tfx_sequence_t>(), __declval<_Receiver>())); + return static_cast<__result_t (*)() noexcept(_Nothrow)>(nullptr); + } else if constexpr (__subscribable_with_member<__tfx_sequence_t, _Receiver>) { + using __result_t = decltype(__declval<__tfx_sequence_t>().subscribe(__declval<_Receiver>())); static_assert( - operation_state<_Result>, - "Sender::subscribe(sender, receiver) must return a type that " + operation_state<__result_t>, + "Sequence::subscribe(sender, receiver) must return a type that " "satisfies the operation_state concept"); - constexpr bool _Nothrow = _NothrowTfxSender - && noexcept(__declval<_TfxSender>() + constexpr bool _Nothrow = _NothrowTfxSequence + && noexcept(__declval<__tfx_sequence_t>() .subscribe(__declval<_Receiver>())); - return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); - } else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, _Receiver>) { - using _Result = tag_invoke_result_t; + return static_cast<__result_t (*)() noexcept(_Nothrow)>(nullptr); + } else if constexpr (__subscribable_with_tag_invoke<__tfx_sequence_t, _Receiver>) { + using __result_t = tag_invoke_result_t; static_assert( - operation_state<_Result>, + operation_state<__result_t>, "exec::subscribe(sender, receiver) must return a type that " "satisfies the operation_state concept"); - constexpr bool _Nothrow = _NothrowTfxSender - && nothrow_tag_invocable; - return static_cast<_Result (*)() noexcept(_Nothrow)>(nullptr); + constexpr bool _Nothrow = _NothrowTfxSequence + && nothrow_tag_invocable; + return static_cast<__result_t (*)() noexcept(_Nothrow)>(nullptr); + } else if constexpr (__is_debug_env>) { + using __result_t = __debug::__debug_operation; + return static_cast<__result_t (*)() noexcept(_NothrowTfxSequence)>(nullptr); } else { - return static_cast<__debug::__debug_operation (*)() noexcept>(nullptr); + return _NO_USABLE_SUBSCRIBE_CUSTOMIZATION_FOUND_(); } } - template - using __select_impl_t = decltype(__select_impl<_Sender, _Receiver>()); - - template - requires __next_connectable<__tfx_sndr<_Sender, _Receiver>, _Receiver> - || __subscribeable_with_member<__tfx_sndr<_Sender, _Receiver>, _Receiver> - || __subscribeable_with_tag_invoke<__tfx_sndr<_Sender, _Receiver>, _Receiver> - || __is_debug_env> - auto operator()(_Sender&& __sndr, _Receiver&& __rcvr) const - noexcept(__nothrow_callable<__select_impl_t<_Sender, _Receiver>>) - -> __call_result_t<__select_impl_t<_Sender, _Receiver>> { - using _TfxSender = __tfx_sndr<_Sender, _Receiver>; + template + using __select_impl_t = decltype(__select_impl<_Sequence, _Receiver>()); + + template + auto operator()(_Sequence&& __sequence, _Receiver&& __rcvr) const + noexcept(__nothrow_callable<__select_impl_t<_Sequence, _Receiver>>) + -> __call_result_t<__select_impl_t<_Sequence, _Receiver>> { + using __tfx_sequence_t = __tfx_sequence_t<_Sequence, _Receiver>; auto&& __env = stdexec::get_env(__rcvr); - auto __domain = __get_late_domain(__sndr, __env); - if constexpr (__next_connectable<_TfxSender, _Receiver>) { - next_sender_of_t<_Receiver, _TfxSender> __next = set_next( - __rcvr, stdexec::transform_sender(__domain, static_cast<_Sender&&>(__sndr), __env)); + auto __domain = __get_late_domain(__sequence, __env); + if constexpr (__next_connectable<__tfx_sequence_t, _Receiver>) { + next_sender_of_t<_Receiver, __tfx_sequence_t> __next = set_next( + __rcvr, stdexec::transform_sender(__domain, static_cast<_Sequence&&>(__sequence), __env)); return stdexec::connect( - static_cast&&>(__next), + static_cast&&>(__next), __stopped_means_break_t<_Receiver>{static_cast<_Receiver&&>(__rcvr)}); // NOLINTNEXTLINE(bugprone-branch-clone) - } else if constexpr (__subscribeable_with_member<_TfxSender, _Receiver>) { - return stdexec::transform_sender(__domain, static_cast<_Sender&&>(__sndr), __env) + } else if constexpr (__subscribable_with_static_member<__tfx_sequence_t, _Receiver>) { + auto&& __tfx_sequence = transform_sender(__domain, static_cast<_Sequence&&>(__sequence), __env); + return __tfx_sequence + .subscribe( + static_cast<__tfx_sequence_t&&>(__tfx_sequence), + static_cast<_Receiver&&>(__rcvr)); + } else if constexpr (__subscribable_with_member<__tfx_sequence_t, _Receiver>) { + return stdexec::transform_sender(__domain, static_cast<_Sequence&&>(__sequence), __env) .subscribe(static_cast<_Receiver&&>(__rcvr)); - } else if constexpr (__subscribeable_with_tag_invoke<_TfxSender, _Receiver>) { + } else if constexpr (__subscribable_with_tag_invoke<__tfx_sequence_t, _Receiver>) { return stdexec::tag_invoke( subscribe_t{}, - stdexec::transform_sender(__domain, static_cast<_Sender&&>(__sndr), __env), + stdexec::transform_sender(__domain, static_cast<_Sequence&&>(__sequence), __env), static_cast<_Receiver&&>(__rcvr)); - } else if constexpr (enable_sequence_sender>) { + } else if constexpr (enable_sequence_sender>) { // This should generate an instantiate backtrace that contains useful // debugging information. - return stdexec::transform_sender(__domain, static_cast<_Sender&&>(__sndr), __env) - .subscribe(static_cast<_Receiver&&>(__rcvr)); + auto&& __tfx_sequence = transform_sender(__domain, static_cast<_Sequence&&>(__sequence), __env); + return __tfx_sequence + .subscribe( + static_cast<__tfx_sequence_t&&>(__tfx_sequence), + static_cast<_Receiver&&>(__rcvr)); } else { // This should generate an instantiate backtrace that contains useful // debugging information. - next_sender_of_t<_Receiver, _TfxSender> __next = set_next( - __rcvr, stdexec::transform_sender(__domain, static_cast<_Sender&&>(__sndr), __env)); + next_sender_of_t<_Receiver, __tfx_sequence_t> __next = set_next( + __rcvr, stdexec::transform_sender(__domain, static_cast<_Sequence&&>(__sequence), __env)); return stdexec::connect( - static_cast&&>(__next), + static_cast&&>(__next), __stopped_means_break_t<_Receiver>{static_cast<_Receiver&&>(__rcvr)}); } } @@ -445,21 +591,21 @@ namespace exec { } }; - template - using subscribe_result_t = __call_result_t; + template + using subscribe_result_t = __call_result_t; } // namespace __sequence_sndr - using __sequence_sndr::__single_sender_completion_sigs; + using __sequence_sndr::__next_sender_completion_sigs_t; using __sequence_sndr::subscribe_t; inline constexpr subscribe_t subscribe{}; using __sequence_sndr::subscribe_result_t; - template + template concept sequence_sender_to = - sequence_receiver_from<_Receiver, _Sender> && requires(_Sender&& __sndr, _Receiver&& __rcvr) { - subscribe(static_cast<_Sender &&>(__sndr), static_cast<_Receiver &&>(__rcvr)); + sequence_receiver_from<_Receiver, _Sequence> && requires(_Sequence&& __sequence, _Receiver&& __rcvr) { + subscribe(static_cast<_Sequence &&>(__sequence), static_cast<_Receiver &&>(__rcvr)); }; template @@ -484,4 +630,158 @@ namespace exec { } } } -} // namespace exec \ No newline at end of file + + //////////////////////////////////////////////////////////////////////////////// +# define STDEXEC_ERROR_GET_ITEM_TYPES_RETURNED_AN_ERROR \ + "\n" \ + "\n" \ + "Trying to compute the sequences's item types resulted in an error. See\n" \ + "the rest of the compiler diagnostic for clues. Look for the string \"_ERROR_\".\n" + +# define STDEXEC_ERROR_GET_ITEM_TYPES_HAS_INVALID_RETURN_TYPE \ + "\n" \ + "\n" \ + "The member function `get_item_types` of the sequence returned an\n" \ + "invalid type.\n" \ + "\n" \ + "A sender's `get_item_types` function must return a specialization of\n" \ + "`exec::item_types<...>`, as follows:\n" \ + "\n" \ + " class MySequence\n" \ + " {\n" \ + " public:\n" \ + " using sender_concept = exec::sequence_sender_t;\n" \ + "\n" \ + " template \n" \ + " auto get_item_types(_Env&&...) -> exec::item_types<\n" \ + " // This sequence produces void items...\n" \ + " stdexec::__call_result_t>\n" \ + " {\n" \ + " return {};\n" \ + " }\n" \ + " ...\n" \ + " };\n" + + // Used to report a meaningful error message when the sender_in + // concept check fails. + template + auto __diagnose_sequence_concept_failure() { + if constexpr (!enable_sequence_sender>) { + static_assert(enable_sequence_sender<_Sequence>, STDEXEC_ERROR_ENABLE_SENDER_IS_FALSE); + } else if constexpr (!stdexec::__detail::__consistent_completion_domains<_Sequence>) { + static_assert( + stdexec::__detail::__consistent_completion_domains<_Sequence>, + "The completion schedulers of the sequence do not have " + "consistent domains. This is likely a " + "bug in the sequence implementation."); + } else if constexpr (!std::move_constructible>) { + static_assert( + std::move_constructible>, "The sequence type is not move-constructible."); + } else if constexpr (!std::constructible_from, _Sequence>) { + static_assert( + std::constructible_from, _Sequence>, + "The sequence cannot be decay-copied. Did you forget a std::move?"); + } else { + using __items_t = __item_types_of_t<_Sequence, _Env...>; + if constexpr (stdexec::__same_as<__items_t, __sequence_sndr::__unrecognized_sequence_error_t<_Sequence, _Env...>>) { + static_assert(stdexec::__mnever<__items_t>, STDEXEC_ERROR_CANNOT_COMPUTE_COMPLETION_SIGNATURES); + } else if constexpr (stdexec::__merror<__items_t>) { + static_assert( + !stdexec::__merror<__items_t>, STDEXEC_ERROR_GET_ITEM_TYPES_RETURNED_AN_ERROR); + } else if constexpr (!__well_formed_item_senders<_Sequence>) { + static_assert( + __well_formed_item_senders<_Sequence>, + STDEXEC_ERROR_GET_ITEM_TYPES_HAS_INVALID_RETURN_TYPE); + } else { + stdexec::__diagnose_sender_concept_failure<_Sequence, _Env...>(); + } +#if STDEXEC_MSVC() || STDEXEC_NVHPC() + // MSVC and NVHPC need more encouragement to print the type of the + // error. + _Completions __what = 0; +#endif + } + } + + namespace __debug { + + template + struct __valid_next { + template + requires stdexec::__one_of<_Item, _Items...> + STDEXEC_ATTRIBUTE(host, device) + stdexec::__call_result_t set_next(_Item&&) noexcept { + STDEXEC_TERMINATE(); + return stdexec::just(); + } + }; + + template + struct __debug_sequence_sender_receiver { + using __t = __debug_sequence_sender_receiver; + using __id = __debug_sequence_sender_receiver; + using receiver_concept = stdexec::receiver_t; + }; + + template + struct __debug_sequence_sender_receiver<_CvrefSequenceId, _Env, stdexec::completion_signatures<_Sigs...>, item_types<_Items...>> + : __valid_completions<__normalize_sig_t<_Sigs>...> + , __valid_next<_Items...> { + using __t = __debug_sequence_sender_receiver; + using __id = __debug_sequence_sender_receiver; + using receiver_concept = stdexec::receiver_t; + + STDEXEC_ATTRIBUTE(host, device) auto get_env() const noexcept -> __debug_env_t<_Env> { + STDEXEC_TERMINATE(); + } + }; + + template , class _Sequence> + void __debug_sequence_sender(_Sequence&& __sequence, const _Env& = {}) { + if constexpr (!__is_debug_env<_Env>) { + if constexpr (sequence_sender_in<_Sequence, _Env>) { + using __sigs_t = stdexec::__completion_signatures_of_t<_Sequence, __debug_env_t<_Env>>; + using __item_types_t = __sequence_sndr::__item_types_of_t<_Sequence, __debug_env_t<_Env>>; + using __receiver_t = __debug_sequence_sender_receiver, _Env, __sigs_t, __item_types_t>; + if constexpr (!std::same_as<__sigs_t, __debug::__completion_signatures> || !std::same_as<__item_types_t, __debug::__item_types>) { + using __operation_t = exec::subscribe_result_t<_Sequence, __receiver_t>; + //static_assert(receiver_of<_Receiver, _Sigs>); + if constexpr (!std::same_as<__operation_t, __debug_operation>) { + if (sizeof(_Sequence) == ~0ul) { // never true + auto __op = subscribe(static_cast<_Sequence&&>(__sequence), __receiver_t{}); + stdexec::start(__op); + } + } + } + } else { + __diagnose_sequence_concept_failure<_Sequence, _Env>(); + } + } + } + } // namespace __debug + using __debug::__debug_sequence_sender; + + #if STDEXEC_ENABLE_EXTRA_TYPE_CHECKING() + // __checked_completion_signatures is for catching logic bugs in a sender's metadata. If sender + // and sender_in are both true, then they had better report the same metadata. This + // completion signatures wrapper enforces that at compile time. + template + auto __checked_item_types(_Sequence && __sequence, _Env &&... __env) noexcept { + using __completions_t = + decltype(get_item_types(stdexec::__declval<_Sequence>(), stdexec::__declval<_Env>()...)); + // (void)__sequence; + // [](auto&&...){}(__env...); + exec::__debug_sequence_sender(static_cast<_Sequence &&>(__sequence), __env...); + return __completions_t{}; + } + + template + requires sequence_sender_in<_Sequence, _Env...> + using item_types_of_t = + decltype(exec::__checked_item_types(stdexec::__declval<_Sequence>(), stdexec::__declval<_Env>()...)); +#else + template + requires sequence_sender_in<_Sequence, _Env...> + using item_types_of_t = __item_types_of_t<_Sequence, _Env...>; +#endif +} // namespace exec diff --git a/test/exec/CMakeLists.txt b/test/exec/CMakeLists.txt index 50eff8e79..eebb67cdd 100644 --- a/test/exec/CMakeLists.txt +++ b/test/exec/CMakeLists.txt @@ -50,6 +50,7 @@ set(exec_test_sources sequence/test_ignore_all_values.cpp sequence/test_iterate.cpp sequence/test_transform_each.cpp + sequence/test_merge.cpp $<$:../execpools/test_tbb_thread_pool.cpp> $<$:../execpools/test_taskflow_thread_pool.cpp> $<$:../execpools/test_asio_thread_pool.cpp> diff --git a/test/exec/sequence/test_merge.cpp b/test/exec/sequence/test_merge.cpp new file mode 100644 index 000000000..2c832fd1b --- /dev/null +++ b/test/exec/sequence/test_merge.cpp @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2023 Maikel Nadolski + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "exec/sequence/merge.hpp" + +#include "exec/sequence/empty_sequence.hpp" +#include "exec/sequence/iterate.hpp" +#include "exec/sequence/ignore_all_values.hpp" +#include "exec/sequence/transform_each.hpp" +#include "exec/sequence.hpp" +#include "exec/sequence_senders.hpp" +#include "exec/trampoline_scheduler.hpp" +#include "exec/static_thread_pool.hpp" +#include "stdexec/__detail/__just.hpp" +#include "stdexec/__detail/__meta.hpp" +#include "stdexec/__detail/__continues_on.hpp" +#include "stdexec/__detail/__upon_error.hpp" +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace { + +struct null_receiver { + using __id = null_receiver; + using __t = null_receiver; + using receiver_concept = ex::receiver_t; + + void set_value() noexcept { + } + + template + void set_error(_Error&& ) noexcept { + } + + void set_stopped() noexcept { + } + + [[nodiscard]] + auto get_env() const noexcept -> ex::env<> { + return {}; + } + + struct ignore_values_fn_t { + template + void operator()(_Vs&&...) const noexcept {} + }; + + template + [[nodiscard]] + auto set_next(_Item&& __item) & noexcept(ex::__nothrow_decay_copyable<_Item>) + -> stdexec::__call_result_t, + ignore_values_fn_t> { + return stdexec::upon_error(stdexec::then(static_cast<_Item&&>(__item), ignore_values_fn_t{}), ignore_values_fn_t{}); + } +}; + + TEST_CASE( + "merge - merge two sequence senders of no elements", + "[sequence_senders][merge][empty_sequence]") { + int counter = 0; + auto merged = exec::merge(exec::empty_sequence(), exec::empty_sequence()); + auto op = exec::subscribe(merged, null_receiver{}); + ex::start(op); + CHECK(counter == 0); + } + + TEST_CASE( + "merge - merge three sequence senders of no elements", + "[sequence_senders][merge][empty_sequence]") { + int counter = 0; + auto merged = exec::merge(exec::empty_sequence(), exec::empty_sequence(), exec::empty_sequence()); + auto op = exec::subscribe(merged, null_receiver{}); + ex::start(op); + CHECK(counter == 0); + } + + TEST_CASE( + "merge - merge sender of 2 senders", + "[sequence_senders][merge]") { + int value = 0; + int count = 0; + auto merged = exec::merge(ex::just(84), ex::just(-42)); + auto transformed = exec::transform_each(merged, ex::then([&value, &count](int x) noexcept { + value += x; + ++count; + })); + auto op = exec::subscribe(transformed, null_receiver{}); + ex::start(op); + CHECK(value == 42); + CHECK(count == 2); + } + + TEST_CASE( + "merge - merge sender of 2 senders and ignores all values", + "[sequence_senders][merge][ignore_all_values]") { + int value = 0; + int count = 0; + auto merged = exec::merge(ex::just(84), ex::just(-42)); + auto transformed = exec::transform_each(merged, ex::then([&value, &count](int x) { + value += x; + ++count; + return value; + })) + | exec::ignore_all_values(); + ex::sync_wait(transformed); + CHECK(value == 42); + CHECK(count == 2); + } + +#if STDEXEC_HAS_STD_RANGES() + TEST_CASE( + "merge - merge sender merges all items", + "[sequence_senders][merge][iterate]") { + auto range = [](auto from, auto to) { + return exec::iterate(std::views::iota(from, to)); + }; + auto then_each = [](auto f) { + return exec::transform_each(ex::then(f)); + }; + // this trampoline is used to interleave the merged iterate() sequences + // the parameters set the max inline schedule recursion depth and max + // inline schedule stack size + exec::trampoline_scheduler sched{16, 512}; + int total = 0; + int count = 0; + std::ptrdiff_t max = 0; + auto sum = exec::merge(range(100, 120), range(200, 220), range(300, 320)) + | then_each([&total, &count, &max](int x) noexcept { + std::ptrdiff_t current = 0; + current = std::abs(reinterpret_cast(¤t) - reinterpret_cast(&max)); + max = current > max ? current : max; + UNSCOPED_INFO("item: " << x << ", stack size: " << current); + total += x; + ++count; + }); + // this causes both iterate sequences to use the same trampoline. + ex::sync_wait(exec::sequence( + stdexec::schedule(sched), + exec::ignore_all_values(sum))); + UNSCOPED_INFO("max stack size: " << max); + CHECK(total == 12570); + CHECK(count == 60); + } + + TEST_CASE( + "merge - merge sender merges all items from multiple threads", + "[sequence_senders][static_thread_pool][merge][iterate]") { + + exec::static_thread_pool ctx0{1}; + ex::scheduler auto sched0 = ctx0.get_scheduler(); + exec::static_thread_pool ctx1{1}; + ex::scheduler auto sched1 = ctx1.get_scheduler(); + exec::static_thread_pool ctx2{1}; + ex::scheduler auto sched2 = ctx2.get_scheduler(); + exec::static_thread_pool ctx3{1}; + ex::scheduler auto sched3 = ctx3.get_scheduler(); + + auto range = [](auto from, auto to) { + return exec::iterate(std::views::iota(from, to)); + }; + auto then_each = [](auto f) { + return exec::transform_each(ex::then(f)); + }; + auto continues_each_on = [](auto sched) { + return exec::transform_each(ex::continues_on(sched)); + }; + int total = 0; + int count = 0; + auto sum = exec::merge( + range(100, 120) | continues_each_on(sched0), + range(200, 220) | continues_each_on(sched1), + range(300, 320) | continues_each_on(sched2)) + | then_each([](int x) noexcept { + // runs on sched0 and sched1 and sched2 in parallel. + // access to shared data would need to be protected + return std::make_tuple(x, std::this_thread::get_id()); + }) + | continues_each_on(sched3) + | then_each([&total, &count](auto v) { + // runs only on sched3, which is a strand (a static + // pool with one thread) + // it is safe to use shared data here + auto [x, id] = v; + total += x; + ++count; + UNSCOPED_INFO("item: " << x + << ", from thread id: " << id + << ", on thread id: " << std::this_thread::get_id()); + }); + ex::sync_wait(exec::sequence( + ex::schedule(sched3), + exec::ignore_all_values(sum))); + CHECK(total == 12570); + CHECK(count == 60); + } +#endif + + struct my_domain { + template Sender, class Env> + static auto transform_sender(Sender&&, const Env&) { + return ex::just(int{21}); + } + }; + + TEST_CASE("merge - can be customized late", "[merge][ignore_all_values]") { + // The customization will return a different value + basic_inline_scheduler sched; + int result = 0; + int count = 0; + auto start = ex::just(std::string{"hello"}); + auto with_scheduler = ex::write_env(ex::prop{ex::get_scheduler, inline_scheduler()}); + auto adaptor = ex::on(sched, ex::then([](std::string x) { return x + ", world"; })) + | with_scheduler; + auto snd = exec::merge( + start | exec::transform_each(adaptor), + start | exec::transform_each(adaptor)) + | exec::transform_each(ex::then([&](int x) { result += x; ++count; })) + | exec::ignore_all_values(); + ex::sync_wait(snd); + CHECK(result == 42); + CHECK(count == 2); + } + +} // namespace