include/boost/capy/when_any.hpp

99.2% Lines (131/132) 96.4% Functions (432/448)
include/boost/capy/when_any.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <coroutine>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/task.hpp>
21
22 #include <array>
23 #include <atomic>
24 #include <exception>
25 #include <optional>
26 #include <ranges>
27 #include <stdexcept>
28 #include <stop_token>
29 #include <tuple>
30 #include <type_traits>
31 #include <utility>
32 #include <variant>
33 #include <vector>
34
35 /*
36 when_any - Race multiple tasks, return first completion
37 ========================================================
38
39 OVERVIEW:
40 ---------
41 when_any launches N tasks concurrently and completes when the FIRST task
42 finishes (success or failure). It then requests stop for all siblings and
43 waits for them to acknowledge before returning.
44
45 ARCHITECTURE:
46 -------------
47 The design mirrors when_all but with inverted completion semantics:
48
49 when_all: complete when remaining_count reaches 0 (all done)
50 when_any: complete when has_winner becomes true (first done)
51 BUT still wait for remaining_count to reach 0 for cleanup
52
53 Key components:
54 - when_any_state: Shared state tracking winner and completion
55 - when_any_runner: Wrapper coroutine for each child task
56 - when_any_launcher: Awaitable that starts all runners concurrently
57
58 CRITICAL INVARIANTS:
59 --------------------
60 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 2. All tasks must complete before parent resumes (cleanup safety)
62 3. Stop is requested immediately when winner is determined
63 4. Only the winner's result/exception is stored
64
65 TYPE DEDUPLICATION:
66 -------------------
67 std::variant requires unique alternative types. Since when_any can race
68 tasks with identical return types (e.g., three task<int>), we must
69 deduplicate types before constructing the variant.
70
71 Example: when_any(task<int>, task<string>, task<int>)
72 - Raw types after void->monostate: int, string, int
73 - Deduplicated variant: std::variant<int, string>
74 - Return: pair<size_t, variant<int, string>>
75
76 The winner_index tells you which task won (0, 1, or 2), while the variant
77 holds the result. Use the index to determine how to interpret the variant.
78
79 VOID HANDLING:
80 --------------
81 void tasks contribute std::monostate to the variant (then deduplicated).
82 All-void tasks result in: pair<size_t, variant<monostate>>
83
84 MEMORY MODEL:
85 -------------
86 Synchronization chain from winner's write to parent's read:
87
88 1. Winner thread writes result_/winner_exception_ (non-atomic)
89 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
90 3. Last task thread (may be winner or non-winner) calls signal_completion()
91 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
92 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
93 5. Parent coroutine resumes and reads result_/winner_exception_
94
95 Synchronization analysis:
96 - All fetch_sub operations on remaining_count_ form a release sequence
97 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
98 in the modification order of remaining_count_
99 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
100 modification order, establishing happens-before from winner's writes
101 - Executor dispatch() is expected to provide queue-based synchronization
102 (release-on-post, acquire-on-execute) completing the chain to parent
103 - Even inline executors work (same thread = sequenced-before)
104
105 Alternative considered: Adding winner_ready_ atomic (set with release after
106 storing winner data, acquired before reading) would make synchronization
107 self-contained and not rely on executor implementation details. Current
108 approach is correct but requires careful reasoning about release sequences
109 and executor behavior.
110
111 EXCEPTION SEMANTICS:
112 --------------------
113 Unlike when_all (which captures first exception, discards others), when_any
114 treats exceptions as valid completions. If the winning task threw, that
115 exception is rethrown. Exceptions from non-winners are silently discarded.
116 */
117
118 namespace boost {
119 namespace capy {
120
121 namespace detail {
122
123 /** Convert void to monostate for variant storage.
124
125 std::variant<void, ...> is ill-formed, so void tasks contribute
126 std::monostate to the result variant instead. Non-void types
127 pass through unchanged.
128
129 @tparam T The type to potentially convert (void becomes monostate).
130 */
131 template<typename T>
132 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
133
134 // Type deduplication: std::variant requires unique alternative types.
135 // Fold left over the type list, appending each type only if not already present.
136 template<typename Variant, typename T>
137 struct variant_append_if_unique;
138
139 template<typename... Vs, typename T>
140 struct variant_append_if_unique<std::variant<Vs...>, T>
141 {
142 using type = std::conditional_t<
143 (std::is_same_v<T, Vs> || ...),
144 std::variant<Vs...>,
145 std::variant<Vs..., T>>;
146 };
147
148 template<typename Accumulated, typename... Remaining>
149 struct deduplicate_impl;
150
151 template<typename Accumulated>
152 struct deduplicate_impl<Accumulated>
153 {
154 using type = Accumulated;
155 };
156
157 template<typename Accumulated, typename T, typename... Rest>
158 struct deduplicate_impl<Accumulated, T, Rest...>
159 {
160 using next = typename variant_append_if_unique<Accumulated, T>::type;
161 using type = typename deduplicate_impl<next, Rest...>::type;
162 };
163
164 // Deduplicated variant; void types become monostate before deduplication
165 template<typename T0, typename... Ts>
166 using unique_variant_t = typename deduplicate_impl<
167 std::variant<void_to_monostate_t<T0>>,
168 void_to_monostate_t<Ts>...>::type;
169
170 // Result: (winner_index, deduplicated_variant). Use index to disambiguate
171 // when multiple tasks share the same return type.
172 template<typename T0, typename... Ts>
173 using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
174
175 /** Core shared state for when_any operations.
176
177 Contains all members and methods common to both heterogeneous (variadic)
178 and homogeneous (range) when_any implementations. State classes embed
179 this via composition to avoid CRTP destructor ordering issues.
180
181 @par Thread Safety
182 Atomic operations protect winner selection and completion count.
183 */
184 struct when_any_core
185 {
186 std::atomic<std::size_t> remaining_count_;
187 std::size_t winner_index_{0};
188 std::exception_ptr winner_exception_;
189 std::stop_source stop_source_;
190
191 // Bridges parent's stop token to our stop_source
192 struct stop_callback_fn
193 {
194 std::stop_source* source_;
195 9 void operator()() const noexcept { source_->request_stop(); }
196 };
197 using stop_callback_t = std::stop_callback<stop_callback_fn>;
198 std::optional<stop_callback_t> parent_stop_callback_;
199
200 std::coroutine_handle<> continuation_;
201 io_env const* caller_env_ = nullptr;
202
203 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
204 std::atomic<bool> has_winner_{false};
205
206 65 explicit when_any_core(std::size_t count) noexcept
207 65 : remaining_count_(count)
208 {
209 65 }
210
211 /** Atomically claim winner status; exactly one task succeeds. */
212 190 bool try_win(std::size_t index) noexcept
213 {
214 190 bool expected = false;
215 190 if(has_winner_.compare_exchange_strong(
216 expected, true, std::memory_order_acq_rel))
217 {
218 65 winner_index_ = index;
219 65 stop_source_.request_stop();
220 65 return true;
221 }
222 125 return false;
223 }
224
225 /** @pre try_win() returned true. */
226 8 void set_winner_exception(std::exception_ptr ep) noexcept
227 {
228 8 winner_exception_ = ep;
229 8 }
230
231 // Runners signal completion directly via final_suspend; no member function needed.
232 };
233
234 /** Shared state for heterogeneous when_any operation.
235
236 Coordinates winner selection, result storage, and completion tracking
237 for all child tasks in a when_any operation. Uses composition with
238 when_any_core for shared functionality.
239
240 @par Lifetime
241 Allocated on the parent coroutine's frame, outlives all runners.
242
243 @tparam T0 First task's result type.
244 @tparam Ts Remaining tasks' result types.
245 */
246 template<typename T0, typename... Ts>
247 struct when_any_state
248 {
249 static constexpr std::size_t task_count = 1 + sizeof...(Ts);
250 using variant_type = unique_variant_t<T0, Ts...>;
251
252 when_any_core core_;
253 std::optional<variant_type> result_;
254 std::array<std::coroutine_handle<>, task_count> runner_handles_{};
255
256 43 when_any_state()
257 43 : core_(task_count)
258 {
259 43 }
260
261 // Runners self-destruct in final_suspend. No destruction needed here.
262
263 /** @pre core_.try_win() returned true.
264 @note Uses in_place_type (not index) because variant is deduplicated.
265 */
266 template<typename T>
267 35 void set_winner_result(T value)
268 noexcept(std::is_nothrow_move_constructible_v<T>)
269 {
270 35 result_.emplace(std::in_place_type<T>, std::move(value));
271 35 }
272
273 /** @pre core_.try_win() returned true. */
274 3 void set_winner_void() noexcept
275 {
276 3 result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
277 3 }
278 };
279
280 /** Wrapper coroutine that runs a single child task for when_any.
281
282 Propagates executor/stop_token to the child, attempts to claim winner
283 status on completion, and signals completion for cleanup coordination.
284
285 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
286 */
287 template<typename StateType>
288 struct when_any_runner
289 {
290 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
291 {
292 StateType* state_ = nullptr;
293 std::size_t index_ = 0;
294 io_env env_;
295
296 190 when_any_runner get_return_object() noexcept
297 {
298 190 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
299 }
300
301 // Starts suspended; launcher sets up state/ex/token then resumes
302 190 std::suspend_always initial_suspend() noexcept
303 {
304 190 return {};
305 }
306
307 190 auto final_suspend() noexcept
308 {
309 struct awaiter
310 {
311 promise_type* p_;
312 bool await_ready() const noexcept { return false; }
313 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept
314 {
315 // Extract everything needed before self-destruction.
316 auto& core = p_->state_->core_;
317 auto* counter = &core.remaining_count_;
318 auto* caller_env = core.caller_env_;
319 auto cont = core.continuation_;
320
321 h.destroy();
322
323 // If last runner, dispatch parent for symmetric transfer.
324 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
325 if(remaining == 1)
326 return caller_env->executor.dispatch(cont);
327 return std::noop_coroutine();
328 }
329 void await_resume() const noexcept {}
330 };
331 190 return awaiter{this};
332 }
333
334 178 void return_void() noexcept {}
335
336 // Exceptions are valid completions in when_any (unlike when_all)
337 12 void unhandled_exception()
338 {
339 12 if(state_->core_.try_win(index_))
340 8 state_->core_.set_winner_exception(std::current_exception());
341 12 }
342
343 /** Injects executor and stop token into child awaitables. */
344 template<class Awaitable>
345 struct transform_awaiter
346 {
347 std::decay_t<Awaitable> a_;
348 promise_type* p_;
349
350 190 bool await_ready() { return a_.await_ready(); }
351 190 auto await_resume() { return a_.await_resume(); }
352
353 template<class Promise>
354 185 auto await_suspend(std::coroutine_handle<Promise> h)
355 {
356 #ifdef _MSC_VER
357 using R = decltype(a_.await_suspend(h, &p_->env_));
358 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
359 a_.await_suspend(h, &p_->env_).resume();
360 else
361 return a_.await_suspend(h, &p_->env_);
362 #else
363 185 return a_.await_suspend(h, &p_->env_);
364 #endif
365 }
366 };
367
368 template<class Awaitable>
369 190 auto await_transform(Awaitable&& a)
370 {
371 using A = std::decay_t<Awaitable>;
372 if constexpr (IoAwaitable<A>)
373 {
374 return transform_awaiter<Awaitable>{
375 380 std::forward<Awaitable>(a), this};
376 }
377 else
378 {
379 static_assert(sizeof(A) == 0, "requires IoAwaitable");
380 }
381 190 }
382 };
383
384 std::coroutine_handle<promise_type> h_;
385
386 190 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
387 190 : h_(h)
388 {
389 190 }
390
391 // Enable move for all clang versions - some versions need it
392 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
393
394 // Non-copyable
395 when_any_runner(when_any_runner const&) = delete;
396 when_any_runner& operator=(when_any_runner const&) = delete;
397 when_any_runner& operator=(when_any_runner&&) = delete;
398
399 190 auto release() noexcept
400 {
401 190 return std::exchange(h_, nullptr);
402 }
403 };
404
405 /** Wraps a child awaitable, attempts to claim winner on completion.
406
407 Uses requires-expressions to detect state capabilities:
408 - set_winner_void(): for heterogeneous void tasks (stores monostate)
409 - set_winner_result(): for non-void tasks
410 - Neither: for homogeneous void tasks (no result storage)
411 */
412 template<IoAwaitable Awaitable, typename StateType>
413 when_any_runner<StateType>
414 190 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
415 {
416 using T = awaitable_result_t<Awaitable>;
417 if constexpr (std::is_void_v<T>)
418 {
419 co_await std::move(inner);
420 if(state->core_.try_win(index))
421 {
422 // Heterogeneous void tasks store monostate in the variant
423 if constexpr (requires { state->set_winner_void(); })
424 state->set_winner_void();
425 // Homogeneous void tasks have no result to store
426 }
427 }
428 else
429 {
430 auto result = co_await std::move(inner);
431 if(state->core_.try_win(index))
432 {
433 // Defensive: move should not throw (already moved once), but we
434 // catch just in case since an uncaught exception would be devastating.
435 try
436 {
437 state->set_winner_result(std::move(result));
438 }
439 catch(...)
440 {
441 state->core_.set_winner_exception(std::current_exception());
442 }
443 }
444 }
445 380 }
446
447 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
448 template<IoAwaitable... Awaitables>
449 class when_any_launcher
450 {
451 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
452
453 std::tuple<Awaitables...>* tasks_;
454 state_type* state_;
455
456 public:
457 43 when_any_launcher(
458 std::tuple<Awaitables...>* tasks,
459 state_type* state)
460 43 : tasks_(tasks)
461 43 , state_(state)
462 {
463 43 }
464
465 43 bool await_ready() const noexcept
466 {
467 43 return sizeof...(Awaitables) == 0;
468 }
469
470 /** CRITICAL: If the last task finishes synchronously, parent resumes and
471 destroys this object before await_suspend returns. Must not reference
472 `this` after the final launch_one call.
473 */
474 43 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
475 {
476 43 state_->core_.continuation_ = continuation;
477 43 state_->core_.caller_env_ = caller_env;
478
479 43 if(caller_env->stop_token.stop_possible())
480 {
481 18 state_->core_.parent_stop_callback_.emplace(
482 9 caller_env->stop_token,
483 9 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
484
485 9 if(caller_env->stop_token.stop_requested())
486 3 state_->core_.stop_source_.request_stop();
487 }
488
489 43 auto token = state_->core_.stop_source_.get_token();
490 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
491 (..., launch_one<Is>(caller_env->executor, token));
492 43 }(std::index_sequence_for<Awaitables...>{});
493
494 86 return std::noop_coroutine();
495 43 }
496
497 43 void await_resume() const noexcept
498 {
499 43 }
500
501 private:
502 /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
503 template<std::size_t I>
504 105 void launch_one(executor_ref caller_ex, std::stop_token token)
505 {
506 105 auto runner = make_when_any_runner(
507 105 std::move(std::get<I>(*tasks_)), state_, I);
508
509 105 auto h = runner.release();
510 105 h.promise().state_ = state_;
511 105 h.promise().index_ = I;
512 105 h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->allocator};
513
514 105 std::coroutine_handle<> ch{h};
515 105 state_->runner_handles_[I] = ch;
516 105 caller_ex.post(ch);
517 210 }
518 };
519
520 } // namespace detail
521
522 /** Wait for the first awaitable to complete.
523
524 Races multiple heterogeneous awaitables concurrently and returns when the
525 first one completes. The result includes the winner's index and a
526 deduplicated variant containing the result value.
527
528 @par Suspends
529 The calling coroutine suspends when co_await is invoked. All awaitables
530 are launched concurrently and execute in parallel. The coroutine resumes
531 only after all awaitables have completed, even though the winner is
532 determined by the first to finish.
533
534 @par Completion Conditions
535 @li Winner is determined when the first awaitable completes (success or exception)
536 @li Only one task can claim winner status via atomic compare-exchange
537 @li Once a winner exists, stop is requested for all remaining siblings
538 @li Parent coroutine resumes only after all siblings acknowledge completion
539 @li The winner's result is returned; if the winner threw, the exception is rethrown
540
541 @par Cancellation Semantics
542 Cancellation is supported via stop_token propagated through the
543 IoAwaitable protocol:
544 @li Each child awaitable receives a stop_token derived from a shared stop_source
545 @li When the parent's stop token is activated, the stop is forwarded to all children
546 @li When a winner is determined, stop_source_.request_stop() is called immediately
547 @li Siblings must handle cancellation gracefully and complete before parent resumes
548 @li Stop requests are cooperative; tasks must check and respond to them
549
550 @par Concurrency/Overlap
551 All awaitables are launched concurrently before any can complete.
552 The launcher iterates through the arguments, starting each task on the
553 caller's executor. Tasks may execute in parallel on multi-threaded
554 executors or interleave on single-threaded executors. There is no
555 guaranteed ordering of task completion.
556
557 @par Notable Error Conditions
558 @li Winner exception: if the winning task threw, that exception is rethrown
559 @li Non-winner exceptions: silently discarded (only winner's result matters)
560 @li Cancellation: tasks may complete via cancellation without throwing
561
562 @par Example
563 @code
564 task<void> example() {
565 auto [index, result] = co_await when_any(
566 fetch_from_primary(), // task<Response>
567 fetch_from_backup() // task<Response>
568 );
569 // index is 0 or 1, result holds the winner's Response
570 auto response = std::get<Response>(result);
571 }
572 @endcode
573
574 @par Example with Heterogeneous Types
575 @code
576 task<void> mixed_types() {
577 auto [index, result] = co_await when_any(
578 fetch_int(), // task<int>
579 fetch_string() // task<std::string>
580 );
581 if (index == 0)
582 std::cout << "Got int: " << std::get<int>(result) << "\n";
583 else
584 std::cout << "Got string: " << std::get<std::string>(result) << "\n";
585 }
586 @endcode
587
588 @tparam A0 First awaitable type (must satisfy IoAwaitable).
589 @tparam As Remaining awaitable types (must satisfy IoAwaitable).
590 @param a0 The first awaitable to race.
591 @param as Additional awaitables to race concurrently.
592 @return A task yielding a pair of (winner_index, result_variant).
593
594 @throws Rethrows the winner's exception if the winning task threw an exception.
595
596 @par Remarks
597 Awaitables are moved into the coroutine frame; original objects become
598 empty after the call. When multiple awaitables share the same return type,
599 the variant is deduplicated to contain only unique types. Use the winner
600 index to determine which awaitable completed first. Void awaitables
601 contribute std::monostate to the variant.
602
603 @see when_all, IoAwaitable
604 */
605 template<IoAwaitable A0, IoAwaitable... As>
606 43 [[nodiscard]] auto when_any(A0 a0, As... as)
607 -> task<detail::when_any_result_t<
608 detail::awaitable_result_t<A0>,
609 detail::awaitable_result_t<As>...>>
610 {
611 using result_type = detail::when_any_result_t<
612 detail::awaitable_result_t<A0>,
613 detail::awaitable_result_t<As>...>;
614
615 detail::when_any_state<
616 detail::awaitable_result_t<A0>,
617 detail::awaitable_result_t<As>...> state;
618 std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
619
620 co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
621
622 if(state.core_.winner_exception_)
623 std::rethrow_exception(state.core_.winner_exception_);
624
625 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
626 86 }
627
628 /** Concept for ranges of full I/O awaitables.
629
630 A range satisfies `IoAwaitableRange` if it is a sized input range
631 whose value type satisfies @ref IoAwaitable. This enables when_any
632 to accept any container or view of awaitables, not just std::vector.
633
634 @tparam R The range type.
635
636 @par Requirements
637 @li `R` must satisfy `std::ranges::input_range`
638 @li `R` must satisfy `std::ranges::sized_range`
639 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
640
641 @par Syntactic Requirements
642 Given `r` of type `R`:
643 @li `std::ranges::begin(r)` is valid
644 @li `std::ranges::end(r)` is valid
645 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
646 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
647
648 @par Example
649 @code
650 template<IoAwaitableRange R>
651 task<void> race_all(R&& awaitables) {
652 auto winner = co_await when_any(std::forward<R>(awaitables));
653 // Process winner...
654 }
655 @endcode
656
657 @see when_any, IoAwaitable
658 */
659 template<typename R>
660 concept IoAwaitableRange =
661 std::ranges::input_range<R> &&
662 std::ranges::sized_range<R> &&
663 IoAwaitable<std::ranges::range_value_t<R>>;
664
665 namespace detail {
666
667 /** Shared state for homogeneous when_any (range overload).
668
669 Uses composition with when_any_core for shared functionality.
670 Simpler than heterogeneous: optional<T> instead of variant, vector
671 instead of array for runner handles.
672 */
673 template<typename T>
674 struct when_any_homogeneous_state
675 {
676 when_any_core core_;
677 std::optional<T> result_;
678 std::vector<std::coroutine_handle<>> runner_handles_;
679
680 19 explicit when_any_homogeneous_state(std::size_t count)
681 19 : core_(count)
682 38 , runner_handles_(count)
683 {
684 19 }
685
686 // Runners self-destruct in final_suspend. No destruction needed here.
687
688 /** @pre core_.try_win() returned true. */
689 17 void set_winner_result(T value)
690 noexcept(std::is_nothrow_move_constructible_v<T>)
691 {
692 17 result_.emplace(std::move(value));
693 17 }
694 };
695
696 /** Specialization for void tasks (no result storage needed). */
697 template<>
698 struct when_any_homogeneous_state<void>
699 {
700 when_any_core core_;
701 std::vector<std::coroutine_handle<>> runner_handles_;
702
703 3 explicit when_any_homogeneous_state(std::size_t count)
704 3 : core_(count)
705 6 , runner_handles_(count)
706 {
707 3 }
708
709 // Runners self-destruct in final_suspend. No destruction needed here.
710
711 // No set_winner_result - void tasks have no result to store
712 };
713
714 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
715 template<IoAwaitableRange Range>
716 class when_any_homogeneous_launcher
717 {
718 using Awaitable = std::ranges::range_value_t<Range>;
719 using T = awaitable_result_t<Awaitable>;
720
721 Range* range_;
722 when_any_homogeneous_state<T>* state_;
723
724 public:
725 22 when_any_homogeneous_launcher(
726 Range* range,
727 when_any_homogeneous_state<T>* state)
728 22 : range_(range)
729 22 , state_(state)
730 {
731 22 }
732
733 22 bool await_ready() const noexcept
734 {
735 22 return std::ranges::empty(*range_);
736 }
737
738 /** CRITICAL: If the last task finishes synchronously, parent resumes and
739 destroys this object before await_suspend returns. Must not reference
740 `this` after dispatching begins.
741
742 Two-phase approach:
743 1. Create all runners (safe - no dispatch yet)
744 2. Dispatch all runners (any may complete synchronously)
745 */
746 22 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
747 {
748 22 state_->core_.continuation_ = continuation;
749 22 state_->core_.caller_env_ = caller_env;
750
751 22 if(caller_env->stop_token.stop_possible())
752 {
753 14 state_->core_.parent_stop_callback_.emplace(
754 7 caller_env->stop_token,
755 7 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
756
757 7 if(caller_env->stop_token.stop_requested())
758 4 state_->core_.stop_source_.request_stop();
759 }
760
761 22 auto token = state_->core_.stop_source_.get_token();
762
763 // Phase 1: Create all runners without dispatching.
764 // This iterates over *range_ safely because no runners execute yet.
765 22 std::size_t index = 0;
766 107 for(auto&& a : *range_)
767 {
768 85 auto runner = make_when_any_runner(
769 85 std::move(a), state_, index);
770
771 85 auto h = runner.release();
772 85 h.promise().state_ = state_;
773 85 h.promise().index_ = index;
774 85 h.promise().env_ = io_env{caller_env->executor, token, caller_env->allocator};
775
776 85 state_->runner_handles_[index] = std::coroutine_handle<>{h};
777 85 ++index;
778 }
779
780 // Phase 2: Post all runners. Any may complete synchronously.
781 // After last post, state_ and this may be destroyed.
782 // Use raw pointer/count captured before posting.
783 22 std::coroutine_handle<>* handles = state_->runner_handles_.data();
784 22 std::size_t count = state_->runner_handles_.size();
785 107 for(std::size_t i = 0; i < count; ++i)
786 85 caller_env->executor.post(handles[i]);
787
788 44 return std::noop_coroutine();
789 107 }
790
791 22 void await_resume() const noexcept
792 {
793 22 }
794 };
795
796 } // namespace detail
797
798 /** Wait for the first awaitable to complete (range overload).
799
800 Races a range of awaitables with the same result type. Accepts any
801 sized input range of IoAwaitable types, enabling use with arrays,
802 spans, or custom containers.
803
804 @par Suspends
805 The calling coroutine suspends when co_await is invoked. All awaitables
806 in the range are launched concurrently and execute in parallel. The
807 coroutine resumes only after all awaitables have completed, even though
808 the winner is determined by the first to finish.
809
810 @par Completion Conditions
811 @li Winner is determined when the first awaitable completes (success or exception)
812 @li Only one task can claim winner status via atomic compare-exchange
813 @li Once a winner exists, stop is requested for all remaining siblings
814 @li Parent coroutine resumes only after all siblings acknowledge completion
815 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
816
817 @par Cancellation Semantics
818 Cancellation is supported via stop_token propagated through the
819 IoAwaitable protocol:
820 @li Each child awaitable receives a stop_token derived from a shared stop_source
821 @li When the parent's stop token is activated, the stop is forwarded to all children
822 @li When a winner is determined, stop_source_.request_stop() is called immediately
823 @li Siblings must handle cancellation gracefully and complete before parent resumes
824 @li Stop requests are cooperative; tasks must check and respond to them
825
826 @par Concurrency/Overlap
827 All awaitables are launched concurrently before any can complete.
828 The launcher iterates through the range, starting each task on the
829 caller's executor. Tasks may execute in parallel on multi-threaded
830 executors or interleave on single-threaded executors. There is no
831 guaranteed ordering of task completion.
832
833 @par Notable Error Conditions
834 @li Empty range: throws std::invalid_argument immediately (not via co_return)
835 @li Winner exception: if the winning task threw, that exception is rethrown
836 @li Non-winner exceptions: silently discarded (only winner's result matters)
837 @li Cancellation: tasks may complete via cancellation without throwing
838
839 @par Example
840 @code
841 task<void> example() {
842 std::array<task<Response>, 3> requests = {
843 fetch_from_server(0),
844 fetch_from_server(1),
845 fetch_from_server(2)
846 };
847
848 auto [index, response] = co_await when_any(std::move(requests));
849 }
850 @endcode
851
852 @par Example with Vector
853 @code
854 task<Response> fetch_fastest(std::vector<Server> const& servers) {
855 std::vector<task<Response>> requests;
856 for (auto const& server : servers)
857 requests.push_back(fetch_from(server));
858
859 auto [index, response] = co_await when_any(std::move(requests));
860 co_return response;
861 }
862 @endcode
863
864 @tparam R Range type satisfying IoAwaitableRange.
865 @param awaitables Range of awaitables to race concurrently (must not be empty).
866 @return A task yielding a pair of (winner_index, result).
867
868 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
869 @throws Rethrows the winner's exception if the winning task threw an exception.
870
871 @par Remarks
872 Elements are moved from the range; for lvalue ranges, the original
873 container will have moved-from elements after this call. The range
874 is moved onto the coroutine frame to ensure lifetime safety. Unlike
875 the variadic overload, no variant wrapper is needed since all tasks
876 share the same return type.
877
878 @see when_any, IoAwaitableRange
879 */
880 template<IoAwaitableRange R>
881 requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
882 21 [[nodiscard]] auto when_any(R&& awaitables)
883 -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
884 {
885 using Awaitable = std::ranges::range_value_t<R>;
886 using T = detail::awaitable_result_t<Awaitable>;
887 using result_type = std::pair<std::size_t, T>;
888 using OwnedRange = std::remove_cvref_t<R>;
889
890 auto count = std::ranges::size(awaitables);
891 if(count == 0)
892 throw std::invalid_argument("when_any requires at least one awaitable");
893
894 // Move/copy range onto coroutine frame to ensure lifetime
895 OwnedRange owned_awaitables = std::forward<R>(awaitables);
896
897 detail::when_any_homogeneous_state<T> state(count);
898
899 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
900
901 if(state.core_.winner_exception_)
902 std::rethrow_exception(state.core_.winner_exception_);
903
904 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
905 42 }
906
907 /** Wait for the first awaitable to complete (void range overload).
908
909 Races a range of void-returning awaitables. Since void awaitables have
910 no result value, only the winner's index is returned.
911
912 @par Suspends
913 The calling coroutine suspends when co_await is invoked. All awaitables
914 in the range are launched concurrently and execute in parallel. The
915 coroutine resumes only after all awaitables have completed, even though
916 the winner is determined by the first to finish.
917
918 @par Completion Conditions
919 @li Winner is determined when the first awaitable completes (success or exception)
920 @li Only one task can claim winner status via atomic compare-exchange
921 @li Once a winner exists, stop is requested for all remaining siblings
922 @li Parent coroutine resumes only after all siblings acknowledge completion
923 @li The winner's index is returned; if the winner threw, the exception is rethrown
924
925 @par Cancellation Semantics
926 Cancellation is supported via stop_token propagated through the
927 IoAwaitable protocol:
928 @li Each child awaitable receives a stop_token derived from a shared stop_source
929 @li When the parent's stop token is activated, the stop is forwarded to all children
930 @li When a winner is determined, stop_source_.request_stop() is called immediately
931 @li Siblings must handle cancellation gracefully and complete before parent resumes
932 @li Stop requests are cooperative; tasks must check and respond to them
933
934 @par Concurrency/Overlap
935 All awaitables are launched concurrently before any can complete.
936 The launcher iterates through the range, starting each task on the
937 caller's executor. Tasks may execute in parallel on multi-threaded
938 executors or interleave on single-threaded executors. There is no
939 guaranteed ordering of task completion.
940
941 @par Notable Error Conditions
942 @li Empty range: throws std::invalid_argument immediately (not via co_return)
943 @li Winner exception: if the winning task threw, that exception is rethrown
944 @li Non-winner exceptions: silently discarded (only winner's result matters)
945 @li Cancellation: tasks may complete via cancellation without throwing
946
947 @par Example
948 @code
949 task<void> example() {
950 std::vector<task<void>> tasks;
951 for (int i = 0; i < 5; ++i)
952 tasks.push_back(background_work(i));
953
954 std::size_t winner = co_await when_any(std::move(tasks));
955 // winner is the index of the first task to complete
956 }
957 @endcode
958
959 @par Example with Timeout
960 @code
961 task<void> with_timeout() {
962 std::vector<task<void>> tasks;
963 tasks.push_back(long_running_operation());
964 tasks.push_back(delay(std::chrono::seconds(5)));
965
966 std::size_t winner = co_await when_any(std::move(tasks));
967 if (winner == 1) {
968 // Timeout occurred
969 }
970 }
971 @endcode
972
973 @tparam R Range type satisfying IoAwaitableRange with void result.
974 @param awaitables Range of void awaitables to race concurrently (must not be empty).
975 @return A task yielding the winner's index (zero-based).
976
977 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
978 @throws Rethrows the winner's exception if the winning task threw an exception.
979
980 @par Remarks
981 Elements are moved from the range; for lvalue ranges, the original
982 container will have moved-from elements after this call. The range
983 is moved onto the coroutine frame to ensure lifetime safety. Unlike
984 the non-void overload, no result storage is needed since void tasks
985 produce no value.
986
987 @see when_any, IoAwaitableRange
988 */
989 template<IoAwaitableRange R>
990 requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
991 3 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
992 {
993 using OwnedRange = std::remove_cvref_t<R>;
994
995 auto count = std::ranges::size(awaitables);
996 if(count == 0)
997 throw std::invalid_argument("when_any requires at least one awaitable");
998
999 // Move/copy range onto coroutine frame to ensure lifetime
1000 OwnedRange owned_awaitables = std::forward<R>(awaitables);
1001
1002 detail::when_any_homogeneous_state<void> state(count);
1003
1004 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1005
1006 if(state.core_.winner_exception_)
1007 std::rethrow_exception(state.core_.winner_exception_);
1008
1009 co_return state.core_.winner_index_;
1010 6 }
1011
1012 } // namespace capy
1013 } // namespace boost
1014
1015 #endif
1016