Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sharded.hh: add invoke_on variant for a shard range #2449

Merged
merged 1 commit into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions include/seastar/core/sharded.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <boost/iterator/counting_iterator.hpp>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sharded.hh: add invoke_on variant for a shard range

Missing patch changelog

#include <concepts>
#include <functional>
#include <ranges>
#include <type_traits>
#endif

/// \defgroup smp-module Multicore
Expand Down Expand Up @@ -100,6 +102,10 @@ using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
template <typename T>
using sharded_unwrap_t = typename sharded_unwrap<T>::type;

template<typename R>
concept unsigned_range = std::ranges::range<R>
&& std::is_unsigned_v<std::ranges::range_value_t<R>>;

} // internal


Expand Down Expand Up @@ -190,7 +196,7 @@ class sharded {
};
std::vector<entry> _instances;
private:
using invoke_on_all_func_type = std::function<future<> (Service&)>;
using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
private:
template <typename U, bool async>
friend struct shared_ptr_make_helper;
Expand Down Expand Up @@ -338,6 +344,42 @@ public:
}
}

/// Invoke a callable on a range of instances of `Service`.
///
/// \param range std::ranges::range of unsigned integers
/// \param options the options to forward to the \ref smp::submit_to()
/// called behind the scenes.
/// \param func a callable with signature `Value (Service&, Args...)` or
/// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
/// to a member function of Service
/// \param args parameters to the callable; will be copied or moved. To pass by reference,
/// use std::ref().
/// \return Future that becomes ready once all calls have completed
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
future<>
invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept;

/// Invoke a callable on a range of instances of `Service`.
/// Passes the default \ref smp_submit_to_options to the
/// \ref smp::submit_to() called behind the scenes.
///
/// \param range std::ranges::range of unsigned integers
/// \param func a callable with signature `Value (Service&, Args...)` or
/// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
/// to a member function of Service
/// \param args parameters to the callable; will be copied or moved. To pass by reference,
/// use std::ref().
/// \return Future that becomes ready once all calls have completed
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
future<>
invoke_on(R range, Func func, Args... args) noexcept;

/// Invoke a callable on all instances of `Service` and reduce the results using
/// `Reducer`.
///
Expand Down Expand Up @@ -761,7 +803,7 @@ sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args..
static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
"invoke_on_all()'s func must return void or future<>");
try {
return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
return std::apply([&service, &func] (Args&&... args) mutable {
return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
}, std::move(args));
Expand All @@ -788,6 +830,48 @@ sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Arg
}
}

template <typename Service>
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
inline
future<>
sharded<Service>::invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept {
try {
auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
// Avoid false-positive unused-lambda-capture warning on Clang
(void)args;
return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
});
return parallel_for_each(range, [this, options, func = std::move(func_futurized)] (unsigned s) {
if (s > smp::count - 1) {
throw std::invalid_argument(format("Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count));
}
return smp::submit_to(s, options, [this, func] {
return func(*get_local_service());
});
});
} catch(...) {
return current_exception_as_future();
}
}

template <typename Service>
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
inline
future<>
sharded<Service>::invoke_on(R range, Func func, Args... args) noexcept {
try {
return invoke_on(std::forward<R>(range), smp_submit_to_options{}, std::move(func), std::move(args)...);
} catch(...) {
return current_exception_as_future();
}
}

template <typename Service>
const Service& sharded<Service>::local() const noexcept {
assert(local_is_initialized());
Expand Down
1 change: 1 addition & 0 deletions src/seastar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ module;
#include <optional>
#include <queue>
#include <random>
#include <ranges>
#include <regex>
#include <source_location>
#include <sstream>
Expand Down
81 changes: 81 additions & 0 deletions tests/unit/sharded_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

#include <seastar/testing/thread_test_case.hh>

#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>

#include <ranges>

using namespace seastar;

Expand Down Expand Up @@ -221,3 +225,80 @@ SEASTAR_THREAD_TEST_CASE(invoke_on_modifiers) {

srv.stop().get();
}

class coordinator_synced_shard_map : public peering_sharded_service<coordinator_synced_shard_map> {
std::vector<unsigned> unsigned_per_shard;
unsigned coordinator_id;

public:
coordinator_synced_shard_map(unsigned coordinator_id) : unsigned_per_shard(smp::count), coordinator_id(coordinator_id) {}

future<> sync(unsigned value) {
return container().invoke_on(coordinator_id, [shard_id = this_shard_id(), value] (coordinator_synced_shard_map& s) {
s.unsigned_per_shard[shard_id] = value;
});
}

unsigned get_synced(int shard_id) {
assert(this_shard_id() == coordinator_id);
return unsigned_per_shard[shard_id];
}
};

SEASTAR_THREAD_TEST_CASE(invoke_on_range_contiguous) {
sharded<coordinator_synced_shard_map> s;
auto coordinator_id = this_shard_id();
s.start(coordinator_id).get();

auto mid = smp::count / 2;
auto half1 = std::views::iota(0u, mid);
auto half1_id = 1;
auto half2 = std::views::iota(mid, smp::count);
auto half2_id = 2;

auto f1 = s.invoke_on(half1, [half1_id] (coordinator_synced_shard_map& s) { return s.sync(half1_id); });
auto f2 = s.invoke_on(half2, [half2_id] (coordinator_synced_shard_map& s) { return s.sync(half2_id); });
f1.get();
f2.get();

auto f3 = s.invoke_on(coordinator_id, [mid, half1_id, half2_id] (coordinator_synced_shard_map& s) {
for (unsigned i = 0; i < mid; ++i) {
BOOST_REQUIRE_EQUAL(half1_id, s.get_synced(i));
}
for (unsigned i = mid; i < smp::count; ++i) {
BOOST_REQUIRE_EQUAL(half2_id, s.get_synced(i));
}
});
f3.get();

s.stop().get();
}

SEASTAR_THREAD_TEST_CASE(invoke_on_range_fragmented) {
sharded<coordinator_synced_shard_map> s;
auto coordinator_id = this_shard_id();
s.start(coordinator_id).get();

// TODO: migrate to C++23 std::views::stride
auto even = std::views::iota(0u, smp::count) | std::views::filter([](int i) { return i % 2 == 0; });
auto even_id = 1;
auto odd = std::views::iota(1u, smp::count) | std::views::filter([](int i) { return i % 2 == 1; });
auto odd_id = 2;

auto f1 = s.invoke_on(even, [even_id] (coordinator_synced_shard_map& s) { return s.sync(even_id); });
auto f2 = s.invoke_on(odd, [odd_id] (coordinator_synced_shard_map& s) { return s.sync(odd_id); });
f1.get();
f2.get();

auto f3 = s.invoke_on(coordinator_id, [even_id, odd_id] (coordinator_synced_shard_map& s) {
for (unsigned i = 0; i < smp::count; i += 2) {
BOOST_REQUIRE_EQUAL(even_id, s.get_synced(i));
}
for (unsigned i = 1; i < smp::count; i += 2) {
BOOST_REQUIRE_EQUAL(odd_id, s.get_synced(i));
}
});
f3.get();

s.stop().get();
}