TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/concept/io_awaitable.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/task.hpp>
21 :
22 : #include <array>
23 : #include <atomic>
24 : #include <exception>
25 : #include <optional>
26 : #include <ranges>
27 : #include <stdexcept>
28 : #include <stop_token>
29 : #include <tuple>
30 : #include <type_traits>
31 : #include <utility>
32 : #include <variant>
33 : #include <vector>
34 :
35 : /*
36 : when_any - Race multiple tasks, return first completion
37 : ========================================================
38 :
39 : OVERVIEW:
40 : ---------
41 : when_any launches N tasks concurrently and completes when the FIRST task
42 : finishes (success or failure). It then requests stop for all siblings and
43 : waits for them to acknowledge before returning.
44 :
45 : ARCHITECTURE:
46 : -------------
47 : The design mirrors when_all but with inverted completion semantics:
48 :
49 : when_all: complete when remaining_count reaches 0 (all done)
50 : when_any: complete when has_winner becomes true (first done)
51 : BUT still wait for remaining_count to reach 0 for cleanup
52 :
53 : Key components:
54 : - when_any_state: Shared state tracking winner and completion
55 : - when_any_runner: Wrapper coroutine for each child task
56 : - when_any_launcher: Awaitable that starts all runners concurrently
57 :
58 : CRITICAL INVARIANTS:
59 : --------------------
60 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 : 2. All tasks must complete before parent resumes (cleanup safety)
62 : 3. Stop is requested immediately when winner is determined
63 : 4. Only the winner's result/exception is stored
64 :
65 : TYPE DEDUPLICATION:
66 : -------------------
67 : std::variant requires unique alternative types. Since when_any can race
68 : tasks with identical return types (e.g., three task<int>), we must
69 : deduplicate types before constructing the variant.
70 :
71 : Example: when_any(task<int>, task<string>, task<int>)
72 : - Raw types after void->monostate: int, string, int
73 : - Deduplicated variant: std::variant<int, string>
74 : - Return: pair<size_t, variant<int, string>>
75 :
76 : The winner_index tells you which task won (0, 1, or 2), while the variant
77 : holds the result. Use the index to determine how to interpret the variant.
78 :
79 : VOID HANDLING:
80 : --------------
81 : void tasks contribute std::monostate to the variant (then deduplicated).
82 : All-void tasks result in: pair<size_t, variant<monostate>>
83 :
84 : MEMORY MODEL:
85 : -------------
86 : Synchronization chain from winner's write to parent's read:
87 :
88 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
89 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
90 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
91 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
92 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
93 : 5. Parent coroutine resumes and reads result_/winner_exception_
94 :
95 : Synchronization analysis:
96 : - All fetch_sub operations on remaining_count_ form a release sequence
97 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
98 : in the modification order of remaining_count_
99 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
100 : modification order, establishing happens-before from winner's writes
101 : - Executor dispatch() is expected to provide queue-based synchronization
102 : (release-on-post, acquire-on-execute) completing the chain to parent
103 : - Even inline executors work (same thread = sequenced-before)
104 :
105 : Alternative considered: Adding winner_ready_ atomic (set with release after
106 : storing winner data, acquired before reading) would make synchronization
107 : self-contained and not rely on executor implementation details. Current
108 : approach is correct but requires careful reasoning about release sequences
109 : and executor behavior.
110 :
111 : EXCEPTION SEMANTICS:
112 : --------------------
113 : Unlike when_all (which captures first exception, discards others), when_any
114 : treats exceptions as valid completions. If the winning task threw, that
115 : exception is rethrown. Exceptions from non-winners are silently discarded.
116 : */
117 :
118 : namespace boost {
119 : namespace capy {
120 :
121 : namespace detail {
122 :
123 : /** Convert void to monostate for variant storage.
124 :
125 : std::variant<void, ...> is ill-formed, so void tasks contribute
126 : std::monostate to the result variant instead. Non-void types
127 : pass through unchanged.
128 :
129 : @tparam T The type to potentially convert (void becomes monostate).
130 : */
131 : template<typename T>
132 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
133 :
134 : // Type deduplication: std::variant requires unique alternative types.
135 : // Fold left over the type list, appending each type only if not already present.
136 : template<typename Variant, typename T>
137 : struct variant_append_if_unique;
138 :
139 : template<typename... Vs, typename T>
140 : struct variant_append_if_unique<std::variant<Vs...>, T>
141 : {
142 : using type = std::conditional_t<
143 : (std::is_same_v<T, Vs> || ...),
144 : std::variant<Vs...>,
145 : std::variant<Vs..., T>>;
146 : };
147 :
148 : template<typename Accumulated, typename... Remaining>
149 : struct deduplicate_impl;
150 :
151 : template<typename Accumulated>
152 : struct deduplicate_impl<Accumulated>
153 : {
154 : using type = Accumulated;
155 : };
156 :
157 : template<typename Accumulated, typename T, typename... Rest>
158 : struct deduplicate_impl<Accumulated, T, Rest...>
159 : {
160 : using next = typename variant_append_if_unique<Accumulated, T>::type;
161 : using type = typename deduplicate_impl<next, Rest...>::type;
162 : };
163 :
164 : // Deduplicated variant; void types become monostate before deduplication
165 : template<typename T0, typename... Ts>
166 : using unique_variant_t = typename deduplicate_impl<
167 : std::variant<void_to_monostate_t<T0>>,
168 : void_to_monostate_t<Ts>...>::type;
169 :
170 : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
171 : // when multiple tasks share the same return type.
172 : template<typename T0, typename... Ts>
173 : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
174 :
175 : /** Core shared state for when_any operations.
176 :
177 : Contains all members and methods common to both heterogeneous (variadic)
178 : and homogeneous (range) when_any implementations. State classes embed
179 : this via composition to avoid CRTP destructor ordering issues.
180 :
181 : @par Thread Safety
182 : Atomic operations protect winner selection and completion count.
183 : */
184 : struct when_any_core
185 : {
186 : std::atomic<std::size_t> remaining_count_;
187 : std::size_t winner_index_{0};
188 : std::exception_ptr winner_exception_;
189 : std::stop_source stop_source_;
190 :
191 : // Bridges parent's stop token to our stop_source
192 : struct stop_callback_fn
193 : {
194 : std::stop_source* source_;
195 HIT 9 : void operator()() const noexcept { source_->request_stop(); }
196 : };
197 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
198 : std::optional<stop_callback_t> parent_stop_callback_;
199 :
200 : std::coroutine_handle<> continuation_;
201 : io_env const* caller_env_ = nullptr;
202 :
203 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
204 : std::atomic<bool> has_winner_{false};
205 :
206 65 : explicit when_any_core(std::size_t count) noexcept
207 65 : : remaining_count_(count)
208 : {
209 65 : }
210 :
211 : /** Atomically claim winner status; exactly one task succeeds. */
212 190 : bool try_win(std::size_t index) noexcept
213 : {
214 190 : bool expected = false;
215 190 : if(has_winner_.compare_exchange_strong(
216 : expected, true, std::memory_order_acq_rel))
217 : {
218 65 : winner_index_ = index;
219 65 : stop_source_.request_stop();
220 65 : return true;
221 : }
222 125 : return false;
223 : }
224 :
225 : /** @pre try_win() returned true. */
226 8 : void set_winner_exception(std::exception_ptr ep) noexcept
227 : {
228 8 : winner_exception_ = ep;
229 8 : }
230 :
231 : // Runners signal completion directly via final_suspend; no member function needed.
232 : };
233 :
234 : /** Shared state for heterogeneous when_any operation.
235 :
236 : Coordinates winner selection, result storage, and completion tracking
237 : for all child tasks in a when_any operation. Uses composition with
238 : when_any_core for shared functionality.
239 :
240 : @par Lifetime
241 : Allocated on the parent coroutine's frame, outlives all runners.
242 :
243 : @tparam T0 First task's result type.
244 : @tparam Ts Remaining tasks' result types.
245 : */
246 : template<typename T0, typename... Ts>
247 : struct when_any_state
248 : {
249 : static constexpr std::size_t task_count = 1 + sizeof...(Ts);
250 : using variant_type = unique_variant_t<T0, Ts...>;
251 :
252 : when_any_core core_;
253 : std::optional<variant_type> result_;
254 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
255 :
256 43 : when_any_state()
257 43 : : core_(task_count)
258 : {
259 43 : }
260 :
261 : // Runners self-destruct in final_suspend. No destruction needed here.
262 :
263 : /** @pre core_.try_win() returned true.
264 : @note Uses in_place_type (not index) because variant is deduplicated.
265 : */
266 : template<typename T>
267 35 : void set_winner_result(T value)
268 : noexcept(std::is_nothrow_move_constructible_v<T>)
269 : {
270 35 : result_.emplace(std::in_place_type<T>, std::move(value));
271 35 : }
272 :
273 : /** @pre core_.try_win() returned true. */
274 3 : void set_winner_void() noexcept
275 : {
276 3 : result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
277 3 : }
278 : };
279 :
280 : /** Wrapper coroutine that runs a single child task for when_any.
281 :
282 : Propagates executor/stop_token to the child, attempts to claim winner
283 : status on completion, and signals completion for cleanup coordination.
284 :
285 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
286 : */
287 : template<typename StateType>
288 : struct when_any_runner
289 : {
290 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
291 : {
292 : StateType* state_ = nullptr;
293 : std::size_t index_ = 0;
294 : io_env env_;
295 :
296 190 : when_any_runner get_return_object() noexcept
297 : {
298 190 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
299 : }
300 :
301 : // Starts suspended; launcher sets up state/ex/token then resumes
302 190 : std::suspend_always initial_suspend() noexcept
303 : {
304 190 : return {};
305 : }
306 :
307 190 : auto final_suspend() noexcept
308 : {
309 : struct awaiter
310 : {
311 : promise_type* p_;
312 190 : bool await_ready() const noexcept { return false; }
313 190 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
314 : {
315 : // Extract everything needed before self-destruction.
316 190 : auto& core = p_->state_->core_;
317 190 : auto* counter = &core.remaining_count_;
318 190 : auto* caller_env = core.caller_env_;
319 190 : auto cont = core.continuation_;
320 :
321 190 : h.destroy();
322 :
323 : // If last runner, dispatch parent for symmetric transfer.
324 190 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
325 190 : if(remaining == 1)
326 65 : return caller_env->executor.dispatch(cont);
327 125 : return std::noop_coroutine();
328 : }
329 MIS 0 : void await_resume() const noexcept {}
330 : };
331 HIT 190 : return awaiter{this};
332 : }
333 :
334 178 : void return_void() noexcept {}
335 :
336 : // Exceptions are valid completions in when_any (unlike when_all)
337 12 : void unhandled_exception()
338 : {
339 12 : if(state_->core_.try_win(index_))
340 8 : state_->core_.set_winner_exception(std::current_exception());
341 12 : }
342 :
343 : /** Injects executor and stop token into child awaitables. */
344 : template<class Awaitable>
345 : struct transform_awaiter
346 : {
347 : std::decay_t<Awaitable> a_;
348 : promise_type* p_;
349 :
350 190 : bool await_ready() { return a_.await_ready(); }
351 190 : auto await_resume() { return a_.await_resume(); }
352 :
353 : template<class Promise>
354 185 : auto await_suspend(std::coroutine_handle<Promise> h)
355 : {
356 : #ifdef _MSC_VER
357 : using R = decltype(a_.await_suspend(h, &p_->env_));
358 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
359 : a_.await_suspend(h, &p_->env_).resume();
360 : else
361 : return a_.await_suspend(h, &p_->env_);
362 : #else
363 185 : return a_.await_suspend(h, &p_->env_);
364 : #endif
365 : }
366 : };
367 :
368 : template<class Awaitable>
369 190 : auto await_transform(Awaitable&& a)
370 : {
371 : using A = std::decay_t<Awaitable>;
372 : if constexpr (IoAwaitable<A>)
373 : {
374 : return transform_awaiter<Awaitable>{
375 380 : std::forward<Awaitable>(a), this};
376 : }
377 : else
378 : {
379 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
380 : }
381 190 : }
382 : };
383 :
384 : std::coroutine_handle<promise_type> h_;
385 :
386 190 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
387 190 : : h_(h)
388 : {
389 190 : }
390 :
391 : // Enable move for all clang versions - some versions need it
392 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
393 :
394 : // Non-copyable
395 : when_any_runner(when_any_runner const&) = delete;
396 : when_any_runner& operator=(when_any_runner const&) = delete;
397 : when_any_runner& operator=(when_any_runner&&) = delete;
398 :
399 190 : auto release() noexcept
400 : {
401 190 : return std::exchange(h_, nullptr);
402 : }
403 : };
404 :
405 : /** Wraps a child awaitable, attempts to claim winner on completion.
406 :
407 : Uses requires-expressions to detect state capabilities:
408 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
409 : - set_winner_result(): for non-void tasks
410 : - Neither: for homogeneous void tasks (no result storage)
411 : */
412 : template<IoAwaitable Awaitable, typename StateType>
413 : when_any_runner<StateType>
414 190 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
415 : {
416 : using T = awaitable_result_t<Awaitable>;
417 : if constexpr (std::is_void_v<T>)
418 : {
419 : co_await std::move(inner);
420 : if(state->core_.try_win(index))
421 : {
422 : // Heterogeneous void tasks store monostate in the variant
423 : if constexpr (requires { state->set_winner_void(); })
424 : state->set_winner_void();
425 : // Homogeneous void tasks have no result to store
426 : }
427 : }
428 : else
429 : {
430 : auto result = co_await std::move(inner);
431 : if(state->core_.try_win(index))
432 : {
433 : // Defensive: move should not throw (already moved once), but we
434 : // catch just in case since an uncaught exception would be devastating.
435 : try
436 : {
437 : state->set_winner_result(std::move(result));
438 : }
439 : catch(...)
440 : {
441 : state->core_.set_winner_exception(std::current_exception());
442 : }
443 : }
444 : }
445 380 : }
446 :
447 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
448 : template<IoAwaitable... Awaitables>
449 : class when_any_launcher
450 : {
451 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
452 :
453 : std::tuple<Awaitables...>* tasks_;
454 : state_type* state_;
455 :
456 : public:
457 43 : when_any_launcher(
458 : std::tuple<Awaitables...>* tasks,
459 : state_type* state)
460 43 : : tasks_(tasks)
461 43 : , state_(state)
462 : {
463 43 : }
464 :
465 43 : bool await_ready() const noexcept
466 : {
467 43 : return sizeof...(Awaitables) == 0;
468 : }
469 :
470 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
471 : destroys this object before await_suspend returns. Must not reference
472 : `this` after the final launch_one call.
473 : */
474 43 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
475 : {
476 43 : state_->core_.continuation_ = continuation;
477 43 : state_->core_.caller_env_ = caller_env;
478 :
479 43 : if(caller_env->stop_token.stop_possible())
480 : {
481 18 : state_->core_.parent_stop_callback_.emplace(
482 9 : caller_env->stop_token,
483 9 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
484 :
485 9 : if(caller_env->stop_token.stop_requested())
486 3 : state_->core_.stop_source_.request_stop();
487 : }
488 :
489 43 : auto token = state_->core_.stop_source_.get_token();
490 86 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
491 43 : (..., launch_one<Is>(caller_env->executor, token));
492 43 : }(std::index_sequence_for<Awaitables...>{});
493 :
494 86 : return std::noop_coroutine();
495 43 : }
496 :
497 43 : void await_resume() const noexcept
498 : {
499 43 : }
500 :
501 : private:
502 : /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
503 : template<std::size_t I>
504 105 : void launch_one(executor_ref caller_ex, std::stop_token token)
505 : {
506 105 : auto runner = make_when_any_runner(
507 105 : std::move(std::get<I>(*tasks_)), state_, I);
508 :
509 105 : auto h = runner.release();
510 105 : h.promise().state_ = state_;
511 105 : h.promise().index_ = I;
512 105 : h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->allocator};
513 :
514 105 : std::coroutine_handle<> ch{h};
515 105 : state_->runner_handles_[I] = ch;
516 105 : caller_ex.post(ch);
517 210 : }
518 : };
519 :
520 : } // namespace detail
521 :
522 : /** Wait for the first awaitable to complete.
523 :
524 : Races multiple heterogeneous awaitables concurrently and returns when the
525 : first one completes. The result includes the winner's index and a
526 : deduplicated variant containing the result value.
527 :
528 : @par Suspends
529 : The calling coroutine suspends when co_await is invoked. All awaitables
530 : are launched concurrently and execute in parallel. The coroutine resumes
531 : only after all awaitables have completed, even though the winner is
532 : determined by the first to finish.
533 :
534 : @par Completion Conditions
535 : @li Winner is determined when the first awaitable completes (success or exception)
536 : @li Only one task can claim winner status via atomic compare-exchange
537 : @li Once a winner exists, stop is requested for all remaining siblings
538 : @li Parent coroutine resumes only after all siblings acknowledge completion
539 : @li The winner's result is returned; if the winner threw, the exception is rethrown
540 :
541 : @par Cancellation Semantics
542 : Cancellation is supported via stop_token propagated through the
543 : IoAwaitable protocol:
544 : @li Each child awaitable receives a stop_token derived from a shared stop_source
545 : @li When the parent's stop token is activated, the stop is forwarded to all children
546 : @li When a winner is determined, stop_source_.request_stop() is called immediately
547 : @li Siblings must handle cancellation gracefully and complete before parent resumes
548 : @li Stop requests are cooperative; tasks must check and respond to them
549 :
550 : @par Concurrency/Overlap
551 : All awaitables are launched concurrently before any can complete.
552 : The launcher iterates through the arguments, starting each task on the
553 : caller's executor. Tasks may execute in parallel on multi-threaded
554 : executors or interleave on single-threaded executors. There is no
555 : guaranteed ordering of task completion.
556 :
557 : @par Notable Error Conditions
558 : @li Winner exception: if the winning task threw, that exception is rethrown
559 : @li Non-winner exceptions: silently discarded (only winner's result matters)
560 : @li Cancellation: tasks may complete via cancellation without throwing
561 :
562 : @par Example
563 : @code
564 : task<void> example() {
565 : auto [index, result] = co_await when_any(
566 : fetch_from_primary(), // task<Response>
567 : fetch_from_backup() // task<Response>
568 : );
569 : // index is 0 or 1, result holds the winner's Response
570 : auto response = std::get<Response>(result);
571 : }
572 : @endcode
573 :
574 : @par Example with Heterogeneous Types
575 : @code
576 : task<void> mixed_types() {
577 : auto [index, result] = co_await when_any(
578 : fetch_int(), // task<int>
579 : fetch_string() // task<std::string>
580 : );
581 : if (index == 0)
582 : std::cout << "Got int: " << std::get<int>(result) << "\n";
583 : else
584 : std::cout << "Got string: " << std::get<std::string>(result) << "\n";
585 : }
586 : @endcode
587 :
588 : @tparam A0 First awaitable type (must satisfy IoAwaitable).
589 : @tparam As Remaining awaitable types (must satisfy IoAwaitable).
590 : @param a0 The first awaitable to race.
591 : @param as Additional awaitables to race concurrently.
592 : @return A task yielding a pair of (winner_index, result_variant).
593 :
594 : @throws Rethrows the winner's exception if the winning task threw an exception.
595 :
596 : @par Remarks
597 : Awaitables are moved into the coroutine frame; original objects become
598 : empty after the call. When multiple awaitables share the same return type,
599 : the variant is deduplicated to contain only unique types. Use the winner
600 : index to determine which awaitable completed first. Void awaitables
601 : contribute std::monostate to the variant.
602 :
603 : @see when_all, IoAwaitable
604 : */
605 : template<IoAwaitable A0, IoAwaitable... As>
606 43 : [[nodiscard]] auto when_any(A0 a0, As... as)
607 : -> task<detail::when_any_result_t<
608 : detail::awaitable_result_t<A0>,
609 : detail::awaitable_result_t<As>...>>
610 : {
611 : using result_type = detail::when_any_result_t<
612 : detail::awaitable_result_t<A0>,
613 : detail::awaitable_result_t<As>...>;
614 :
615 : detail::when_any_state<
616 : detail::awaitable_result_t<A0>,
617 : detail::awaitable_result_t<As>...> state;
618 : std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
619 :
620 : co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
621 :
622 : if(state.core_.winner_exception_)
623 : std::rethrow_exception(state.core_.winner_exception_);
624 :
625 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
626 86 : }
627 :
628 : /** Concept for ranges of full I/O awaitables.
629 :
630 : A range satisfies `IoAwaitableRange` if it is a sized input range
631 : whose value type satisfies @ref IoAwaitable. This enables when_any
632 : to accept any container or view of awaitables, not just std::vector.
633 :
634 : @tparam R The range type.
635 :
636 : @par Requirements
637 : @li `R` must satisfy `std::ranges::input_range`
638 : @li `R` must satisfy `std::ranges::sized_range`
639 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
640 :
641 : @par Syntactic Requirements
642 : Given `r` of type `R`:
643 : @li `std::ranges::begin(r)` is valid
644 : @li `std::ranges::end(r)` is valid
645 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
646 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
647 :
648 : @par Example
649 : @code
650 : template<IoAwaitableRange R>
651 : task<void> race_all(R&& awaitables) {
652 : auto winner = co_await when_any(std::forward<R>(awaitables));
653 : // Process winner...
654 : }
655 : @endcode
656 :
657 : @see when_any, IoAwaitable
658 : */
659 : template<typename R>
660 : concept IoAwaitableRange =
661 : std::ranges::input_range<R> &&
662 : std::ranges::sized_range<R> &&
663 : IoAwaitable<std::ranges::range_value_t<R>>;
664 :
665 : namespace detail {
666 :
667 : /** Shared state for homogeneous when_any (range overload).
668 :
669 : Uses composition with when_any_core for shared functionality.
670 : Simpler than heterogeneous: optional<T> instead of variant, vector
671 : instead of array for runner handles.
672 : */
673 : template<typename T>
674 : struct when_any_homogeneous_state
675 : {
676 : when_any_core core_;
677 : std::optional<T> result_;
678 : std::vector<std::coroutine_handle<>> runner_handles_;
679 :
680 19 : explicit when_any_homogeneous_state(std::size_t count)
681 19 : : core_(count)
682 38 : , runner_handles_(count)
683 : {
684 19 : }
685 :
686 : // Runners self-destruct in final_suspend. No destruction needed here.
687 :
688 : /** @pre core_.try_win() returned true. */
689 17 : void set_winner_result(T value)
690 : noexcept(std::is_nothrow_move_constructible_v<T>)
691 : {
692 17 : result_.emplace(std::move(value));
693 17 : }
694 : };
695 :
696 : /** Specialization for void tasks (no result storage needed). */
697 : template<>
698 : struct when_any_homogeneous_state<void>
699 : {
700 : when_any_core core_;
701 : std::vector<std::coroutine_handle<>> runner_handles_;
702 :
703 3 : explicit when_any_homogeneous_state(std::size_t count)
704 3 : : core_(count)
705 6 : , runner_handles_(count)
706 : {
707 3 : }
708 :
709 : // Runners self-destruct in final_suspend. No destruction needed here.
710 :
711 : // No set_winner_result - void tasks have no result to store
712 : };
713 :
714 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
715 : template<IoAwaitableRange Range>
716 : class when_any_homogeneous_launcher
717 : {
718 : using Awaitable = std::ranges::range_value_t<Range>;
719 : using T = awaitable_result_t<Awaitable>;
720 :
721 : Range* range_;
722 : when_any_homogeneous_state<T>* state_;
723 :
724 : public:
725 22 : when_any_homogeneous_launcher(
726 : Range* range,
727 : when_any_homogeneous_state<T>* state)
728 22 : : range_(range)
729 22 : , state_(state)
730 : {
731 22 : }
732 :
733 22 : bool await_ready() const noexcept
734 : {
735 22 : return std::ranges::empty(*range_);
736 : }
737 :
738 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
739 : destroys this object before await_suspend returns. Must not reference
740 : `this` after dispatching begins.
741 :
742 : Two-phase approach:
743 : 1. Create all runners (safe - no dispatch yet)
744 : 2. Dispatch all runners (any may complete synchronously)
745 : */
746 22 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
747 : {
748 22 : state_->core_.continuation_ = continuation;
749 22 : state_->core_.caller_env_ = caller_env;
750 :
751 22 : if(caller_env->stop_token.stop_possible())
752 : {
753 14 : state_->core_.parent_stop_callback_.emplace(
754 7 : caller_env->stop_token,
755 7 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
756 :
757 7 : if(caller_env->stop_token.stop_requested())
758 4 : state_->core_.stop_source_.request_stop();
759 : }
760 :
761 22 : auto token = state_->core_.stop_source_.get_token();
762 :
763 : // Phase 1: Create all runners without dispatching.
764 : // This iterates over *range_ safely because no runners execute yet.
765 22 : std::size_t index = 0;
766 107 : for(auto&& a : *range_)
767 : {
768 85 : auto runner = make_when_any_runner(
769 85 : std::move(a), state_, index);
770 :
771 85 : auto h = runner.release();
772 85 : h.promise().state_ = state_;
773 85 : h.promise().index_ = index;
774 85 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->allocator};
775 :
776 85 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
777 85 : ++index;
778 : }
779 :
780 : // Phase 2: Post all runners. Any may complete synchronously.
781 : // After last post, state_ and this may be destroyed.
782 : // Use raw pointer/count captured before posting.
783 22 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
784 22 : std::size_t count = state_->runner_handles_.size();
785 107 : for(std::size_t i = 0; i < count; ++i)
786 85 : caller_env->executor.post(handles[i]);
787 :
788 44 : return std::noop_coroutine();
789 107 : }
790 :
791 22 : void await_resume() const noexcept
792 : {
793 22 : }
794 : };
795 :
796 : } // namespace detail
797 :
798 : /** Wait for the first awaitable to complete (range overload).
799 :
800 : Races a range of awaitables with the same result type. Accepts any
801 : sized input range of IoAwaitable types, enabling use with arrays,
802 : spans, or custom containers.
803 :
804 : @par Suspends
805 : The calling coroutine suspends when co_await is invoked. All awaitables
806 : in the range are launched concurrently and execute in parallel. The
807 : coroutine resumes only after all awaitables have completed, even though
808 : the winner is determined by the first to finish.
809 :
810 : @par Completion Conditions
811 : @li Winner is determined when the first awaitable completes (success or exception)
812 : @li Only one task can claim winner status via atomic compare-exchange
813 : @li Once a winner exists, stop is requested for all remaining siblings
814 : @li Parent coroutine resumes only after all siblings acknowledge completion
815 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
816 :
817 : @par Cancellation Semantics
818 : Cancellation is supported via stop_token propagated through the
819 : IoAwaitable protocol:
820 : @li Each child awaitable receives a stop_token derived from a shared stop_source
821 : @li When the parent's stop token is activated, the stop is forwarded to all children
822 : @li When a winner is determined, stop_source_.request_stop() is called immediately
823 : @li Siblings must handle cancellation gracefully and complete before parent resumes
824 : @li Stop requests are cooperative; tasks must check and respond to them
825 :
826 : @par Concurrency/Overlap
827 : All awaitables are launched concurrently before any can complete.
828 : The launcher iterates through the range, starting each task on the
829 : caller's executor. Tasks may execute in parallel on multi-threaded
830 : executors or interleave on single-threaded executors. There is no
831 : guaranteed ordering of task completion.
832 :
833 : @par Notable Error Conditions
834 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
835 : @li Winner exception: if the winning task threw, that exception is rethrown
836 : @li Non-winner exceptions: silently discarded (only winner's result matters)
837 : @li Cancellation: tasks may complete via cancellation without throwing
838 :
839 : @par Example
840 : @code
841 : task<void> example() {
842 : std::array<task<Response>, 3> requests = {
843 : fetch_from_server(0),
844 : fetch_from_server(1),
845 : fetch_from_server(2)
846 : };
847 :
848 : auto [index, response] = co_await when_any(std::move(requests));
849 : }
850 : @endcode
851 :
852 : @par Example with Vector
853 : @code
854 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
855 : std::vector<task<Response>> requests;
856 : for (auto const& server : servers)
857 : requests.push_back(fetch_from(server));
858 :
859 : auto [index, response] = co_await when_any(std::move(requests));
860 : co_return response;
861 : }
862 : @endcode
863 :
864 : @tparam R Range type satisfying IoAwaitableRange.
865 : @param awaitables Range of awaitables to race concurrently (must not be empty).
866 : @return A task yielding a pair of (winner_index, result).
867 :
868 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
869 : @throws Rethrows the winner's exception if the winning task threw an exception.
870 :
871 : @par Remarks
872 : Elements are moved from the range; for lvalue ranges, the original
873 : container will have moved-from elements after this call. The range
874 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
875 : the variadic overload, no variant wrapper is needed since all tasks
876 : share the same return type.
877 :
878 : @see when_any, IoAwaitableRange
879 : */
880 : template<IoAwaitableRange R>
881 : requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
882 21 : [[nodiscard]] auto when_any(R&& awaitables)
883 : -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
884 : {
885 : using Awaitable = std::ranges::range_value_t<R>;
886 : using T = detail::awaitable_result_t<Awaitable>;
887 : using result_type = std::pair<std::size_t, T>;
888 : using OwnedRange = std::remove_cvref_t<R>;
889 :
890 : auto count = std::ranges::size(awaitables);
891 : if(count == 0)
892 : throw std::invalid_argument("when_any requires at least one awaitable");
893 :
894 : // Move/copy range onto coroutine frame to ensure lifetime
895 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
896 :
897 : detail::when_any_homogeneous_state<T> state(count);
898 :
899 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
900 :
901 : if(state.core_.winner_exception_)
902 : std::rethrow_exception(state.core_.winner_exception_);
903 :
904 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
905 42 : }
906 :
907 : /** Wait for the first awaitable to complete (void range overload).
908 :
909 : Races a range of void-returning awaitables. Since void awaitables have
910 : no result value, only the winner's index is returned.
911 :
912 : @par Suspends
913 : The calling coroutine suspends when co_await is invoked. All awaitables
914 : in the range are launched concurrently and execute in parallel. The
915 : coroutine resumes only after all awaitables have completed, even though
916 : the winner is determined by the first to finish.
917 :
918 : @par Completion Conditions
919 : @li Winner is determined when the first awaitable completes (success or exception)
920 : @li Only one task can claim winner status via atomic compare-exchange
921 : @li Once a winner exists, stop is requested for all remaining siblings
922 : @li Parent coroutine resumes only after all siblings acknowledge completion
923 : @li The winner's index is returned; if the winner threw, the exception is rethrown
924 :
925 : @par Cancellation Semantics
926 : Cancellation is supported via stop_token propagated through the
927 : IoAwaitable protocol:
928 : @li Each child awaitable receives a stop_token derived from a shared stop_source
929 : @li When the parent's stop token is activated, the stop is forwarded to all children
930 : @li When a winner is determined, stop_source_.request_stop() is called immediately
931 : @li Siblings must handle cancellation gracefully and complete before parent resumes
932 : @li Stop requests are cooperative; tasks must check and respond to them
933 :
934 : @par Concurrency/Overlap
935 : All awaitables are launched concurrently before any can complete.
936 : The launcher iterates through the range, starting each task on the
937 : caller's executor. Tasks may execute in parallel on multi-threaded
938 : executors or interleave on single-threaded executors. There is no
939 : guaranteed ordering of task completion.
940 :
941 : @par Notable Error Conditions
942 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
943 : @li Winner exception: if the winning task threw, that exception is rethrown
944 : @li Non-winner exceptions: silently discarded (only winner's result matters)
945 : @li Cancellation: tasks may complete via cancellation without throwing
946 :
947 : @par Example
948 : @code
949 : task<void> example() {
950 : std::vector<task<void>> tasks;
951 : for (int i = 0; i < 5; ++i)
952 : tasks.push_back(background_work(i));
953 :
954 : std::size_t winner = co_await when_any(std::move(tasks));
955 : // winner is the index of the first task to complete
956 : }
957 : @endcode
958 :
959 : @par Example with Timeout
960 : @code
961 : task<void> with_timeout() {
962 : std::vector<task<void>> tasks;
963 : tasks.push_back(long_running_operation());
964 : tasks.push_back(delay(std::chrono::seconds(5)));
965 :
966 : std::size_t winner = co_await when_any(std::move(tasks));
967 : if (winner == 1) {
968 : // Timeout occurred
969 : }
970 : }
971 : @endcode
972 :
973 : @tparam R Range type satisfying IoAwaitableRange with void result.
974 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
975 : @return A task yielding the winner's index (zero-based).
976 :
977 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
978 : @throws Rethrows the winner's exception if the winning task threw an exception.
979 :
980 : @par Remarks
981 : Elements are moved from the range; for lvalue ranges, the original
982 : container will have moved-from elements after this call. The range
983 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
984 : the non-void overload, no result storage is needed since void tasks
985 : produce no value.
986 :
987 : @see when_any, IoAwaitableRange
988 : */
989 : template<IoAwaitableRange R>
990 : requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
991 3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
992 : {
993 : using OwnedRange = std::remove_cvref_t<R>;
994 :
995 : auto count = std::ranges::size(awaitables);
996 : if(count == 0)
997 : throw std::invalid_argument("when_any requires at least one awaitable");
998 :
999 : // Move/copy range onto coroutine frame to ensure lifetime
1000 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
1001 :
1002 : detail::when_any_homogeneous_state<void> state(count);
1003 :
1004 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1005 :
1006 : if(state.core_.winner_exception_)
1007 : std::rethrow_exception(state.core_.winner_exception_);
1008 :
1009 : co_return state.core_.winner_index_;
1010 6 : }
1011 :
1012 : } // namespace capy
1013 : } // namespace boost
1014 :
1015 : #endif
|