Skip to content

Commit

Permalink
loop: add range support to parallel_for_each()
Browse files Browse the repository at this point in the history
before this change, we only support containers, whose `begin()` and
`end()` are of the same type, but ranges do not have this requirement.

in order to support range views, let's relax this requirement. so that,
we can pass, for instance, `std::views::iota(1, 10)` to `parallel_for_each()`.
see also b1b0bff.

Signed-off-by: Kefu Chai <[email protected]>
  • Loading branch information
tchaikov committed Oct 5, 2024
1 parent a30ec0c commit bc29141
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions include/seastar/core/loop.hh
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,15 @@ future<> keep_doing(AsyncAction action) noexcept {
}

namespace internal {
template <typename Iterator, typename AsyncAction>
template <typename Iterator, class Sentinel, typename AsyncAction>
class do_for_each_state final : public continuation_base<> {
Iterator _begin;
Iterator _end;
Sentinel _end;
AsyncAction _action;
promise<> _pr;

public:
do_for_each_state(Iterator begin, Iterator end, AsyncAction action, future<>&& first_unavailable)
do_for_each_state(Iterator begin, Sentinel end, AsyncAction action, future<>&& first_unavailable)
: _begin(std::move(begin)), _end(std::move(end)), _action(std::move(action)) {
internal::set_callback(std::move(first_unavailable), this);
}
Expand Down Expand Up @@ -422,16 +422,16 @@ public:
}
};

template<typename Iterator, typename AsyncAction>
template<typename Iterator, typename Sentinel, typename AsyncAction>
inline
future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
future<> do_for_each_impl(Iterator begin, Sentinel end, AsyncAction action) {
while (begin != end) {
auto f = futurize_invoke(action, *begin++);
if (f.failed()) {
return f;
}
if (!f.available() || need_preempt()) {
auto* s = new internal::do_for_each_state<Iterator, AsyncAction>{
auto* s = new internal::do_for_each_state<Iterator, Sentinel, AsyncAction>{
std::move(begin), std::move(end), std::move(action), std::move(f)};
return s->get_future();
}
Expand All @@ -454,12 +454,15 @@ future<> do_for_each_impl(Iterator begin, Iterator end, AsyncAction action) {
/// when it is acceptable to process the next item.
/// \return a ready future on success, or the first failed future if
/// \c action failed.
template<typename Iterator, typename AsyncAction>
requires requires (Iterator i, AsyncAction aa) {
{ futurize_invoke(aa, *i) } -> std::same_as<future<>>;
}
template<typename Iterator, typename Sentinel, typename AsyncAction>
requires (
requires (Iterator i, AsyncAction aa) {
{ futurize_invoke(aa, *i) } -> std::same_as<future<>>;
} &&
(std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
)
inline
future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept {
future<> do_for_each(Iterator begin, Sentinel end, AsyncAction action) noexcept {
try {
return internal::do_for_each_impl(std::move(begin), std::move(end), std::move(action));
} catch (...) {
Expand All @@ -472,19 +475,19 @@ future<> do_for_each(Iterator begin, Iterator end, AsyncAction action) noexcept
/// For each item in a range, call a function, waiting for the previous
/// invocation to complete before calling the next one.
///
/// \param c an \c Container object designating input range
/// \param c an \c Range object designating input range
/// \param action a callable, taking a reference to objects from the range
/// as a parameter, and returning a \c future<> that resolves
/// when it is acceptable to process the next item.
/// \return a ready future on success, or the first failed future if
/// \c action failed.
template<typename Container, typename AsyncAction>
requires requires (Container c, AsyncAction aa) {
template<typename Range, typename AsyncAction>
requires requires (Range c, AsyncAction aa) {
{ futurize_invoke(aa, *std::begin(c)) } -> std::same_as<future<>>;
std::end(c);
}
inline
future<> do_for_each(Container& c, AsyncAction action) noexcept {
future<> do_for_each(Range& c, AsyncAction action) noexcept {
try {
return internal::do_for_each_impl(std::begin(c), std::end(c), std::move(action));
} catch (...) {
Expand Down

0 comments on commit bc29141

Please sign in to comment.