diff --git a/include/nvexec/stream/common.cuh b/include/nvexec/stream/common.cuh index fa0c8b989..618082793 100644 --- a/include/nvexec/stream/common.cuh +++ b/include/nvexec/stream/common.cuh @@ -532,7 +532,7 @@ namespace nv::execution {} template - STDEXEC_ATTRIBUTE(host, device) + STDEXEC_ATTRIBUTE(device) void set_value(Args&&... args) noexcept { using tuple_t = decayed_tuple_t; @@ -540,20 +540,13 @@ namespace nv::execution producer_(task_); } - STDEXEC_ATTRIBUTE(host, device) void set_stopped() noexcept - { - using tuple_t = decayed_tuple_t; - variant_->template emplace(set_stopped_t()); - producer_(task_); - } - template - STDEXEC_ATTRIBUTE(host, device) + STDEXEC_ATTRIBUTE(device) void set_error(Error&& err) noexcept { if constexpr (__decays_to) { - // What is `exception_ptr` but death pending + // What is `exception_ptr` but death pending? using tuple_t = decayed_tuple_t; variant_->template emplace(STDEXEC::set_error, cudaErrorUnknown); } @@ -565,6 +558,15 @@ namespace nv::execution producer_(task_); } + STDEXEC_ATTRIBUTE(device) + void set_stopped() noexcept + { + using tuple_t = decayed_tuple_t; + variant_->template emplace(set_stopped_t()); + producer_(task_); + } + + [[nodiscard]] auto get_env() const noexcept -> Env const & { return *env_; diff --git a/include/nvexec/stream/repeat_n.cuh b/include/nvexec/stream/repeat_n.cuh index 7d36412df..70722397f 100644 --- a/include/nvexec/stream/repeat_n.cuh +++ b/include/nvexec/stream/repeat_n.cuh @@ -68,13 +68,8 @@ namespace nv::execution::_strm { using operation_state_concept = STDEXEC::operation_state_t; - using scheduler_t = - STDEXEC::__result_of, - STDEXEC::env_of_t, - STDEXEC::env_of_t>; - - using inner_sender_t = - STDEXEC::__result_of, Sender&>; + using scheduler_t = __completion_scheduler_of_t>; + using inner_sender_t = STDEXEC::__result_of; using inner_opstate_t = STDEXEC::connect_result_t>; explicit opstate(Sender&& sndr, Receiver rcvr, std::size_t count, scheduler_t sched) @@ -91,9 +86,9 @@ namespace nv::execution::_strm auto& _connect() { - inner_opstate_.__emplace_from(STDEXEC::connect, - exec::sequence(STDEXEC::schedule(sched_), sndr_), - receiver{*this}); + return inner_opstate_.__emplace_from(STDEXEC::connect, + STDEXEC::starts_on(sched_, sndr_), + receiver{*this}); } template diff --git a/include/nvexec/stream/starts_on.cuh b/include/nvexec/stream/starts_on.cuh new file mode 100644 index 000000000..f28dffddd --- /dev/null +++ b/include/nvexec/stream/starts_on.cuh @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2026 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// clang-format Language: Cpp + +#pragma once + +#include "../../stdexec/execution.hpp" +#include "../../stdexec/functional.hpp" + +#include "let_xxx.cuh" + +#include "common.cuh" + +namespace nv::execution::_strm +{ + template <> + struct transform_sender_for + { + template + auto operator()(Env const &, STDEXEC::starts_on_t, Scheduler&& sched, Sender&& sndr) const + { + return STDEXEC::let_value(STDEXEC::schedule(sched), + STDEXEC::__always(static_cast(sndr))); + } + }; +} // namespace nv::execution::_strm diff --git a/include/nvexec/stream/then.cuh b/include/nvexec/stream/then.cuh index 2de508ced..70010104e 100644 --- a/include/nvexec/stream/then.cuh +++ b/include/nvexec/stream/then.cuh @@ -71,10 +71,9 @@ namespace nv::execution::_strm requires std::invocable...> void set_value(Args&&... args) noexcept { - using result_t = std::invoke_result_t...>; - constexpr bool does_not_return_a_value = std::is_same_v; - _strm::opstate_base& opstate = opstate_; - cudaStream_t stream = opstate.get_stream(); + using result_t = std::invoke_result_t...>; + constexpr bool does_not_return_a_value = std::is_same_v; + cudaStream_t stream = opstate_.get_stream(); if constexpr (does_not_return_a_value) { @@ -83,29 +82,29 @@ namespace nv::execution::_strm if (cudaError_t status = STDEXEC_LOG_CUDA_API(cudaPeekAtLastError()); status == cudaSuccess) { - opstate.propagate_completion_signal(STDEXEC::set_value); + opstate_.propagate_completion_signal(STDEXEC::set_value); } else { - opstate.propagate_completion_signal(STDEXEC::set_error, std::move(status)); + opstate_.propagate_completion_signal(STDEXEC::set_error, std::move(status)); } } else { using decayed_result_t = __decay_t; - auto* d_result = static_cast(opstate.temp_storage_); + auto* d_result = static_cast(opstate_.temp_storage_); _then_kernel_with_result <<<1, 1, 0, stream>>>(std::move(f_), d_result, static_cast(args)...); - opstate.defer_temp_storage_destruction(d_result); + opstate_.defer_temp_storage_destruction(d_result); if (cudaError_t status = STDEXEC_LOG_CUDA_API(cudaPeekAtLastError()); status == cudaSuccess) { - opstate.propagate_completion_signal(STDEXEC::set_value, std::move(*d_result)); + opstate_.propagate_completion_signal(STDEXEC::set_value, std::move(*d_result)); } else { - opstate.propagate_completion_signal(STDEXEC::set_error, std::move(status)); + opstate_.propagate_completion_signal(STDEXEC::set_error, std::move(status)); } } } @@ -185,7 +184,7 @@ namespace nv::execution::_strm static_cast(self).sndr_, static_cast(rcvr), [&](_strm::opstate_base& stream_provider) -> receiver_t - { return receiver_t(self.fun_, stream_provider); }); + { return receiver_t(static_cast(self).fun_, stream_provider); }); } STDEXEC_EXPLICIT_THIS_END(connect) @@ -209,11 +208,11 @@ namespace nv::execution::_strm struct transform_sender_for { template - auto operator()(Env const &, __ignore, Fn fun, CvSender&& sndr) const + auto operator()(Env const &, __ignore, Fn&& fun, CvSender&& sndr) const { if constexpr (stream_completing_sender) { - using _sender_t = then_sender<__decay_t, Fn>; + using _sender_t = then_sender<__decay_t, __decay_t>; return _sender_t{static_cast(sndr), static_cast(fun)}; } else diff --git a/include/nvexec/stream_context.cuh b/include/nvexec/stream_context.cuh index 05a99d68c..f5128f73f 100644 --- a/include/nvexec/stream_context.cuh +++ b/include/nvexec/stream_context.cuh @@ -35,6 +35,7 @@ #include "stream/schedule_from.cuh" // IWYU pragma: export #include "stream/split.cuh" // IWYU pragma: export #include "stream/start_detached.cuh" // IWYU pragma: export +#include "stream/starts_on.cuh" // IWYU pragma: export #include "stream/sync_wait.cuh" // IWYU pragma: export #include "stream/then.cuh" // IWYU pragma: export #include "stream/upon_error.cuh" // IWYU pragma: export diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index c0b0410ef..fdbf8147b 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -242,6 +242,24 @@ namespace STDEXEC template STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __as_bulk_chunked_fn(_Fun) -> __as_bulk_chunked_fn<_Fun>; + template + struct __attrs : env<__fwd_env_t>> + { + using __base_t = env<__fwd_env_t>>; + using __base_t::query; + + constexpr explicit __attrs(_Child const & __child) noexcept + : __base_t{__fwd_env(STDEXEC::get_env(__child))} + {} + + template + STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) + constexpr auto query(__get_completion_behavior_t, _Env&&...) const noexcept + { + return STDEXEC::__get_completion_behavior(); + } + }; + template struct __impl_base : __sexpr_defaults { @@ -252,9 +270,10 @@ namespace STDEXEC using __shape_t = decltype(__decay_t<__data_of<_Sender>>::__shape_); // Forward the child sender's environment (which contains completion scheduler) - static constexpr auto __get_attrs = [](__ignore, __ignore, auto const & __child) noexcept + static constexpr auto __get_attrs = // + [](__ignore, __ignore, _Child const & __child) noexcept { - return __fwd_env(STDEXEC::get_env(__child)); + return __attrs{__child}; }; template diff --git a/include/stdexec/__detail/__continues_on.hpp b/include/stdexec/__detail/__continues_on.hpp index 29daade62..b57968823 100644 --- a/include/stdexec/__detail/__continues_on.hpp +++ b/include/stdexec/__detail/__continues_on.hpp @@ -174,13 +174,13 @@ namespace STDEXEC return false; } - _Scheduler const & __sch_; - _Sender const & __sndr_; + _Scheduler __sch_; + env_of_t<_Sender> __attrs_; public: - constexpr explicit __attrs(_Scheduler const & __sch, _Sender const & __sndr) noexcept - : __sch_(__sch) - , __sndr_(__sndr) + constexpr explicit __attrs(_Scheduler __sch, env_of_t<_Sender> __attrs) noexcept + : __sch_(static_cast<_Scheduler&&>(__sch)) + , __attrs_(static_cast&&>(__attrs)) {} //! @brief Queries the completion scheduler for a given @c _SetTag. @@ -221,7 +221,7 @@ namespace STDEXEC env_of_t<_Sender>, __fwd_env_t<_Env>...> { - return get_completion_scheduler<_SetTag>(get_env(__sndr_), __fwd_env(__env)...); + return get_completion_scheduler<_SetTag>(__attrs_, __fwd_env(__env)...); } //! @brief Queries the completion domain for a given @c _SetTag. @@ -295,15 +295,16 @@ namespace STDEXEC } //! @brief Forwards other queries to the underlying sender's environment. - //! @pre @c _Tag is a forwarding query but not a completion query. - template <__forwarding_query _Tag, class... _Args> - requires(!__completion_query<_Tag>) && __queryable_with, _Tag, _Args...> + //! @pre @c _Query is a forwarding query but not a completion query. + template <__forwarding_query _Query, class... _Args> + requires(!__completion_query<_Query>) + && __queryable_with, _Query, _Args...> [[nodiscard]] - constexpr auto query(_Tag, _Args&&... __args) const - noexcept(__nothrow_queryable_with, _Tag, _Args...>) - -> __query_result_t, _Tag, _Args...> + constexpr auto query(_Query, _Args&&... __args) const + noexcept(__nothrow_queryable_with, _Query, _Args...>) + -> __query_result_t, _Query, _Args...> { - return get_env(__sndr_).query(_Tag{}, static_cast<_Args&&>(__args)...); + return __attrs_.query(_Query(), static_cast<_Args&&>(__args)...); } }; @@ -346,9 +347,11 @@ namespace STDEXEC public: static constexpr auto __get_attrs = - [](__ignore, auto const & __data, auto const & __child) noexcept + [](__ignore, + _Scheduler const & __data, + _Child const & __child) noexcept { - return __attrs{__data, __child}; + return __attrs<_Scheduler, _Child>{__data, STDEXEC::get_env(__child)}; }; template diff --git a/include/stdexec/__detail/__on.hpp b/include/stdexec/__detail/__on.hpp index c822648ac..9c0579db5 100644 --- a/include/stdexec/__detail/__on.hpp +++ b/include/stdexec/__detail/__on.hpp @@ -28,14 +28,152 @@ #include "__schedulers.hpp" #include "__sender_adaptor_closure.hpp" #include "__sender_introspection.hpp" +#include "__sender_ref.hpp" +#include "__starts_on.hpp" #include "__utility.hpp" +STDEXEC_PRAGMA_PUSH() +STDEXEC_PRAGMA_IGNORE_GNU("-Wmissing-braces") + namespace STDEXEC { ///////////////////////////////////////////////////////////////////////////// // [execution.senders.adaptors.on] struct _CANNOT_RESTORE_EXECUTION_CONTEXT_AFTER_ON_; + namespace __on + { + // If __is_root_env<_Env> is true, then this sender has no parent, so there is no need + // to restore the execution context. We can use the inline scheduler as the scheduler + // if __env does not have one. + template + using __end_sched_t = + __if_c<__is_root_env<_Env>, + inline_scheduler, + __not_a_scheduler<_WHAT_(_CANNOT_RESTORE_EXECUTION_CONTEXT_AFTER_ON_), + _WHY_(_THE_CURRENT_EXECUTION_ENVIRONMENT_DOESNT_HAVE_A_SCHEDULER_), + _WHERE_(_IN_ALGORITHM_, on_t), + _WITH_PRETTY_SENDER_<_Child>, + _WITH_ENVIRONMENT_(_Env)>>; + + // This transform_sender overload handles the case where `on` was called like `on(sch, + // sndr)`. In this case, we find the old scheduler by looking in the receiver's + // environment. + template + requires scheduler<_Scheduler> + STDEXEC_ATTRIBUTE(always_inline) + constexpr auto __transform_sender(_Scheduler&& __new_sched, + _Child&& __child, + _Env const & __env) + { + auto __default_sched = __end_sched_t<_Child, _Env>(); + auto __old_sched = __with_default(get_scheduler, __default_sched)(__env); + + return continues_on(starts_on(static_cast<_Scheduler&&>(__new_sched), + static_cast<_Child&&>(__child)), + std::move(__old_sched)); + } + + // This transform_sender overload handles the case where `on` was called like `sndr | + // on(sch, clsur)` or `on(sndr, sch, clsur)`. In this case, __child is a predecessor + // sender, so the scheduler we want to restore is the completion scheduler of __child. + template + requires(!scheduler<_Data>) + STDEXEC_ATTRIBUTE(always_inline) + constexpr auto __transform_sender(_Data&& __data, _Child&& __child, _Env const & __env) + { + auto& [__new_sched, __clsur] = __data; + auto __default_sched = __end_sched_t<_Child, _Env>(); + auto __get_sched = __with_default(get_completion_scheduler, __default_sched); + auto __old_sched = __get_sched(get_env(__child), __env); + + return continues_on(STDEXEC::__forward_like<_Data>(__clsur)( + continues_on(static_cast<_Child&&>(__child), + STDEXEC::__forward_like<_Data>(__new_sched))), + std::move(__old_sched)); + } + + template + struct __attrs_base + { + template <__forwarding_query _Query, class... _Args> + requires(!__completion_query<_Query>) + && __queryable_with, _Query, _Args...> + STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) + constexpr auto query(_Query __query, _Args&&... __args) const + noexcept(__nothrow_queryable_with, _Query, _Args...>) + -> __query_result_t, _Query, _Args...> + { + return __query(STDEXEC::get_env(__child_), static_cast<_Args&&>(__args)...); + } + + _Child const & __child_; + }; + + template + struct __attrs; + + template + struct __attrs<_Child, _Scheduler, _Closure> : __attrs_base<_Child> + { + using __trnsfr_sndr_t = __result_of, _Scheduler>; + using __clsur_result_t = __call_result_t<_Closure const &, __trnsfr_sndr_t>; + template + using __old_sched_t = + __query_result_t, get_completion_scheduler_t, _Env>; + template + using __attrs_t = __trnsfr::__attrs<__old_sched_t<_Env>, __clsur_result_t>; + using __attrs_base<_Child>::query; + + explicit constexpr __attrs(_Child const & __child, + _Scheduler __sched, + _Closure const & __clsur) + : __attrs_base<_Child>{__child} + , __clsur_result_(__clsur(continues_on(__sender_proxy{__child}, std::move(__sched)))) + {} + + template + requires __completion_query<_Query> // + && __queryable_with, get_completion_scheduler_t, _Env> + && __queryable_with<__attrs_t<_Env>, _Query, _Env> + STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) + constexpr auto query(_Query __query, _Env&& __env) const noexcept + -> __query_result_t<__attrs_t<_Env>, _Query, _Env> + { + auto&& __child_attrs = STDEXEC::get_env(this->__child_); + auto __old_sch = get_completion_scheduler(__child_attrs, __env); + auto __attrs = __attrs_t<_Env>(__old_sch, STDEXEC::get_env(__clsur_result_)); + return __query(__attrs, static_cast<_Env&&>(__env)); + } + + __clsur_result_t __clsur_result_; + }; + + template + struct __attrs<_Child, _Scheduler> : __attrs_base<_Child> + { + using __child_t = __result_of; + using __child_attrs_t = __starts_on::__attrs<_Scheduler, _Child>; + template + using __attrs_t = __trnsfr::__attrs<__result_of, __child_t>; + using __attrs_base<_Child>::query; + + template _Env> + requires __completion_query<_Query> && __queryable_with<__attrs_t<_Env>, _Query, _Env> + STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) + constexpr auto query(_Query __query, _Env&& __env) const noexcept + -> __query_result_t<__attrs_t<_Env>, _Query, _Env> + { + auto&& __child_attrs = STDEXEC::get_env(this->__child_); + auto __old_sch = get_scheduler(__env); + auto __attrs = __attrs_t<_Env>(__old_sch, __child_attrs_t(__sched_, __child_attrs)); + return __query(__attrs, static_cast<_Env&&>(__env)); + } + + _Scheduler __sched_; + }; + } // namespace __on + //////////////////////////////////////////////////////////////////////////////////////////////// struct on_t { @@ -62,64 +200,23 @@ namespace STDEXEC return __closure(*this, static_cast<_Scheduler&&>(__sched), static_cast<_Closure&&>(__clsur)); } - // This transform_sender overload handles the case where `on` was called like `on(sch, - // sndr)`. In this case, we find the old scheduler by looking in the receiver's - // environment. - template <__decay_copyable _Sender, class _Env> - requires scheduler<__data_of<_Sender>> - STDEXEC_ATTRIBUTE(always_inline) - static auto transform_sender(set_value_t, _Sender&& __sndr, _Env const & __env) - { - static_assert(__sender_for<_Sender, on_t>); - auto& [__tag, __new_sched, __child] = __sndr; - auto __old_sched = __with_default(get_scheduler, __end_sched_t<_Sender, _Env>())(__env); - - return continues_on(starts_on(STDEXEC::__forward_like<_Sender>(__new_sched), - STDEXEC::__forward_like<_Sender>(__child)), - std::move(__old_sched)); - } - - // This transform_sender overload handles the case where `on` was called like `sndr | - // on(sch, clsur)` or `on(sndr, sch, clsur)`. In this case, __child is a predecessor - // sender, so the scheduler we want to restore is the completion scheduler of __child. template <__decay_copyable _Sender, class _Env> - requires(!scheduler<__data_of<_Sender>>) STDEXEC_ATTRIBUTE(always_inline) - static auto transform_sender(set_value_t, _Sender&& __sndr, _Env const & __env) + static auto transform_sender(set_value_t, _Sender&& __sndr, _Env&& __env) { static_assert(__sender_for<_Sender, on_t>); auto& [__tag, __data, __child] = __sndr; - auto& [__new_sched, __clsur] = __data; - - auto __old_sched = __with_default(get_completion_scheduler, - __end_sched_t<_Sender, _Env>())(get_env(__child), __env); - - return continues_on(STDEXEC::__forward_like<_Sender>(__clsur)( - continues_on(STDEXEC::__forward_like<_Sender>(__child), - STDEXEC::__forward_like<_Sender>(__new_sched))), - std::move(__old_sched)); + return __on::__transform_sender(STDEXEC::__forward_like<_Sender>(__data), + STDEXEC::__forward_like<_Sender>(__child), + static_cast<_Env&&>(__env)); } template - static auto transform_sender(set_value_t, _Sender&&, _Env const &) + static auto transform_sender(set_value_t, _Sender&&, _Env&&) { return __not_a_sender<_WHAT_(_SENDER_TYPE_IS_NOT_DECAY_COPYABLE_), _WITH_PRETTY_SENDER_<_Sender>>{}; } - - private: - // If __is_root_env<_Env> is true, then this sender has no parent, so there is no need - // to restore the execution context. We can use the inline scheduler as the scheduler - // if __env does not have one. - template - using __end_sched_t = - __if_c<__is_root_env<_Env>, - inline_scheduler, - __not_a_scheduler<_WHAT_(_CANNOT_RESTORE_EXECUTION_CONTEXT_AFTER_ON_), - _WHY_(_THE_CURRENT_EXECUTION_ENVIRONMENT_DOESNT_HAVE_A_SCHEDULER_), - _WHERE_(_IN_ALGORITHM_, on_t), - _WITH_PRETTY_SENDER_<__child_of<_Sender>>, - _WITH_ENVIRONMENT_(_Env)>>; }; inline constexpr on_t on{}; @@ -127,6 +224,27 @@ namespace STDEXEC template <> struct __sexpr_impl : __sexpr_defaults { + static constexpr auto __get_attrs = // + [](__ignore, _Data const & __data, _Child const & __child) noexcept + { + if constexpr (scheduler<_Data>) + { + // This is the case where `on` was called like `on(sch, sndr)`, which is equivalent + // to `continues_on(starts_on(sndr, sch), old_sch)`. + using __attrs_t = __on::__attrs<_Child, _Data>; + return __attrs_t{__child, __data}; + } + else + { + // This is the case where `on` was called like `sndr | on(sch, clsur)` or + // `on(sndr, sch, clsur)`, which is equivalent to + // `continues_on(clsur(continues_on(sndr, sch)), old_sch)`. + auto const& [__sched, __clsur] = __data; + using __attrs_t = __on::__attrs<_Child, decltype(__sched), decltype(__clsur)>; + return __attrs_t{__child, __sched, __clsur}; + } + }; + template static constexpr auto __get_completion_signatures() { @@ -135,3 +253,5 @@ namespace STDEXEC } }; } // namespace STDEXEC + +STDEXEC_PRAGMA_POP() diff --git a/include/stdexec/__detail/__sender_ref.hpp b/include/stdexec/__detail/__sender_ref.hpp new file mode 100644 index 000000000..f5e10310b --- /dev/null +++ b/include/stdexec/__detail/__sender_ref.hpp @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2021-2024 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "__execution_fwd.hpp" + +// include these after __execution_fwd.hpp +#include "__connect.hpp" +#include "__env.hpp" +#include "__get_completion_signatures.hpp" + +namespace STDEXEC +{ + // A wrapper around a sender to be used when an adaptor with a sender transform wants to + // query the transformed sender's attributes without actually transforming the sender. + template + struct __sender_proxy + { + using sender_concept = _Sender::sender_concept; + + template + using __sender_t = __copy_cvref_t<_Self, std::remove_cv_t<_Sender>>; + + template + static consteval auto get_completion_signatures() // + -> __completion_signatures_of_t<__sender_t<_Self>, _Env...> + { + return STDEXEC::get_completion_signatures<__sender_t<_Self>, _Env...>(); + } + + [[nodiscard]] + constexpr auto get_env() const noexcept -> env_of_t<_Sender> + { + return STDEXEC::get_env(__sndr_); + } + + _Sender& __sndr_; + }; + + template + STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __sender_proxy(_Sender&) -> __sender_proxy<_Sender>; + + // A reference wrapper around a multi-shot sender. Useful in adaptors like `repeat_n` + // where we want to repeatedly connect to the same sender. + template + struct __sender_ref + { + using sender_concept = _Sender::sender_concept; + + template + static consteval auto + get_completion_signatures() -> __completion_signatures_of_t<_Sender&, _Env...> + { + return STDEXEC::get_completion_signatures<_Sender&, _Env...>(); + } + + template + constexpr auto connect(_Receiver __rcvr) const + noexcept(__nothrow_connectable<_Sender&, _Receiver>) -> connect_result_t<_Sender&, _Receiver> + { + return STDEXEC::connect(__sndr_, std::move(__rcvr)); + } + + [[nodiscard]] + constexpr auto get_env() const noexcept -> env_of_t<_Sender> + { + return STDEXEC::get_env(__sndr_); + } + + _Sender& __sndr_; + }; + + template + STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE __sender_ref(_Sender&) -> __sender_ref<_Sender>; +} // namespace STDEXEC diff --git a/include/stdexec/__detail/__senders.hpp b/include/stdexec/__detail/__senders.hpp index b76eedd23..0ab140789 100644 --- a/include/stdexec/__detail/__senders.hpp +++ b/include/stdexec/__detail/__senders.hpp @@ -62,4 +62,8 @@ namespace STDEXEC // early sender type-checking template concept __well_formed_sender = sender_in<_Sender> || dependent_sender<_Sender>; + + template <__well_formed_sender _Sender> + constexpr void __ensure_well_formed_sender() noexcept + {} } // namespace STDEXEC diff --git a/include/stdexec/__detail/__sequence.hpp b/include/stdexec/__detail/__sequence.hpp index 0a18dd383..93bc961d8 100644 --- a/include/stdexec/__detail/__sequence.hpp +++ b/include/stdexec/__detail/__sequence.hpp @@ -200,6 +200,11 @@ namespace STDEXEC using receiver_concept = receiver_t; using __env_t = __join_env_t<_Env2 const &, __fwd_env_t>>; + template + constexpr explicit __rcvr2(__state<_Receiver, _Env2> *__self) noexcept + : __self_(__self) + {} + template constexpr void set_value(_As &&...__as) noexcept { @@ -491,6 +496,12 @@ namespace STDEXEC template inline constexpr auto __structured_binding_size_v<__seq::__sndr<_Senders...>> = -1; + namespace __detail + { + template + extern __declfn_t<__seq::__sndr<__demangle_t<_Senders>...>> + __demangle_v<__seq::__sndr<_Senders...>>; + } // namespace __detail } // namespace STDEXEC STDEXEC_PRAGMA_POP() diff --git a/include/stdexec/__detail/__starts_on.hpp b/include/stdexec/__detail/__starts_on.hpp index 7cab6d295..39f33f386 100644 --- a/include/stdexec/__detail/__starts_on.hpp +++ b/include/stdexec/__detail/__starts_on.hpp @@ -32,79 +32,67 @@ namespace STDEXEC { - ///////////////////////////////////////////////////////////////////////////// - // [execution.senders.adaptors.starts_on] - struct starts_on_t - { - template - constexpr auto - operator()(_Scheduler&& __sched, _Sender&& __sndr) const -> __well_formed_sender auto - { - return __make_sexpr(static_cast<_Scheduler&&>(__sched), - static_cast<_Sender&&>(__sndr)); - } - - template <__decay_copyable _Sender> - static constexpr auto transform_sender(set_value_t, _Sender&& __sndr, __ignore) - { - auto& [__tag, __sched, __child] = __sndr; - // NOT TO SPEC: the specification requires that this be implemented in terms of - // let_value(schedule(sch), []{ return child; }), but that implementation - // is inefficient on the GPU. We could customize starts_on for the GPU to use this - // implementation, but this is a good change to make for all platforms since it - // avoids unnecessarily making the child sender dependent on the completion of the - // schedule operation. - return __sequence(continues_on(just(), __sched), STDEXEC::__forward_like<_Sender>(__child)); - } - - template - static constexpr auto transform_sender(set_value_t, _Sender&&, __ignore) - { - return __not_a_sender<_SENDER_TYPE_IS_NOT_DECAY_COPYABLE_, _WITH_PRETTY_SENDER_<_Sender>>{}; - } - }; - - inline constexpr starts_on_t starts_on{}; - - template <> - struct __sexpr_impl : __sexpr_defaults + namespace __starts_on { template struct __attrs { + private: template static constexpr auto __mk_env2(_Scheduler __sch, _Env&&... __env) { - return env(STDEXEC::__mk_sch_env(__sch, __env...), static_cast<_Env&&>(__env)...); + return __env::__join(STDEXEC::__mk_sch_env(__sch, __env...), static_cast<_Env&&>(__env)...); } template using __env2_t = decltype(__mk_env2(__declval<_Scheduler>(), __declval<_Env>()...)); + _Scheduler __sched_; + env_of_t<_Child> __attr_; + + public: + constexpr explicit __attrs(_Scheduler __sch, env_of_t<_Child> __attr) noexcept + : __sched_(static_cast<_Scheduler&&>(__sch)) + , __attr_(__attr) + {} + + // Query for completion scheduler to use for algorithm dispatching. + // NOT TO SPEC + template + STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) + constexpr auto query(get_completion_domain_t<>, _Env&&...) const noexcept + -> __completion_domain_of_t + { + return {}; + } + // Query for completion scheduler template + requires __completes_where_it_starts<_SetTag, env_of_t<_Child>, __env2_t<_Env>...> STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) constexpr auto query(get_completion_scheduler_t<_SetTag>, _Env&&...) const noexcept -> _Scheduler - requires(__completes_inline<_SetTag, env_of_t<_Child>, __env2_t<_Env>...>) { - // If child completes inline, then starts_on completes on its scheduler + // If the child completes where it starts, then starts_on(Sch,Child) completes on + // scheduler Sch. return __sched_; } - // Query for completion scheduler - delegates to child's env with augmented environment + // Query for completion scheduler - delegates to child's env with augmented + // environment template + requires(!__completes_where_it_starts<_SetTag, env_of_t<_Child>, __env2_t<_Env>...>) STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) constexpr auto query(get_completion_scheduler_t<_SetTag> __query, _Env&&... __env) const noexcept -> __call_result_t, env_of_t<_Child>, __env2_t<_Env>...> - requires(!__completes_inline<_SetTag, env_of_t<_Child>, __env2_t<_Env>...>) { // If child doesn't complete inline, delegate to child's completion scheduler - return __query(__attr_, __mk_env2(__sched_, static_cast<_Env&&>(__env))...); + return __query(__attr_, __mk_env2(__sched_, __env)...); } - // Query for completion domain - calculate type from child's env with augmented environment + // Query for completion domain - calculate type from child's env with augmented + // environment template STDEXEC_ATTRIBUTE(nodiscard, always_inline, host, device) constexpr auto query(get_completion_domain_t<_SetTag>, _Env&&...) const noexcept @@ -112,17 +100,51 @@ namespace STDEXEC { return {}; } - - _Scheduler __sched_; - env_of_t<_Child> __attr_; }; + } // namespace __starts_on + + ///////////////////////////////////////////////////////////////////////////// + // [exec.starts.on] + struct starts_on_t + { + template + constexpr auto + operator()(_Scheduler&& __sched, _Sender&& __sndr) const -> __well_formed_sender auto + { + return __make_sexpr(static_cast<_Scheduler&&>(__sched), + static_cast<_Sender&&>(__sndr)); + } + + template <__decay_copyable _Sender> + static constexpr auto transform_sender(set_value_t, _Sender&& __sndr, __ignore) + { + auto& [__tag, __sched, __child] = __sndr; + // NOT TO SPEC: the specification requires that this be implemented in terms of + // let_value(schedule(sch), []{ return child; }), but that implementation + // is inefficient on the GPU. We could customize starts_on for the GPU to use this + // implementation, but this is a good change to make for all platforms since it + // avoids unnecessarily making the child sender dependent on the completion of the + // schedule operation. + return __sequence(continues_on(just(), __sched), STDEXEC::__forward_like<_Sender>(__child)); + } + template + static constexpr auto transform_sender(set_value_t, _Sender&&, __ignore) + { + return __not_a_sender<_SENDER_TYPE_IS_NOT_DECAY_COPYABLE_, _WITH_PRETTY_SENDER_<_Sender>>{}; + } + }; + + inline constexpr starts_on_t starts_on{}; + + template <> + struct __sexpr_impl : __sexpr_defaults + { static constexpr auto __get_attrs = - [](__ignore, - _Data const & __data, - _Child const & __child) noexcept -> __attrs<_Data, _Child> + [](__ignore, _Data const & __data, _Child const & __child) noexcept + -> __starts_on::__attrs<_Data, _Child> { - return __attrs<_Data, _Child>{__data, STDEXEC::get_env(__child)}; + return __starts_on::__attrs<_Data, _Child>{__data, STDEXEC::get_env(__child)}; }; template diff --git a/include/stdexec/__detail/__transform_sender.hpp b/include/stdexec/__detail/__transform_sender.hpp index 49799d026..05f08a764 100644 --- a/include/stdexec/__detail/__transform_sender.hpp +++ b/include/stdexec/__detail/__transform_sender.hpp @@ -198,10 +198,6 @@ namespace STDEXEC concept __completes_on = __decays_to<__call_result_t, env_of_t<_Sender>, _Env>, _Scheduler>; - - ///////////////////////////////////////////////////////////////////////////// - template - concept __starts_on = __decays_to<__call_result_t, _Scheduler>; } // namespace STDEXEC STDEXEC_PRAGMA_POP()