Skip to content

Commit

Permalink
sharded.hh: add invoke_on variant for a shard range
Browse files Browse the repository at this point in the history
Add a convenient method to invoke a function on a range of shards, to have better support for the cold control path on more complex compute models like shard groups for different tasks. A workload can benefit from such a model if inter-task cooperation is better when grouping from perf and QoS perspectives, for example. Or a computation requires a only a subset of shards due to internal concurrency limits.
  • Loading branch information
tomershafir authored and avikivity committed Sep 29, 2024
1 parent 03db01b commit b571539
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 2 deletions.
88 changes: 86 additions & 2 deletions include/seastar/core/sharded.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <boost/iterator/counting_iterator.hpp>
#include <concepts>
#include <functional>
#include <ranges>
#include <type_traits>
#endif

/// \defgroup smp-module Multicore
Expand Down Expand Up @@ -100,6 +102,10 @@ using sharded_unwrap_evaluated_t = typename sharded_unwrap<T>::evaluated_type;
template <typename T>
using sharded_unwrap_t = typename sharded_unwrap<T>::type;

template<typename R>
concept unsigned_range = std::ranges::range<R>
&& std::is_unsigned_v<std::ranges::range_value_t<R>>;

} // internal


Expand Down Expand Up @@ -190,7 +196,7 @@ class sharded {
};
std::vector<entry> _instances;
private:
using invoke_on_all_func_type = std::function<future<> (Service&)>;
using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
private:
template <typename U, bool async>
friend struct shared_ptr_make_helper;
Expand Down Expand Up @@ -338,6 +344,42 @@ public:
}
}

/// Invoke a callable on a range of instances of `Service`.
///
/// \param range std::ranges::range of unsigned integers
/// \param options the options to forward to the \ref smp::submit_to()
/// called behind the scenes.
/// \param func a callable with signature `Value (Service&, Args...)` or
/// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
/// to a member function of Service
/// \param args parameters to the callable; will be copied or moved. To pass by reference,
/// use std::ref().
/// \return Future that becomes ready once all calls have completed
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
future<>
invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept;

/// Invoke a callable on a range of instances of `Service`.
/// Passes the default \ref smp_submit_to_options to the
/// \ref smp::submit_to() called behind the scenes.
///
/// \param range std::ranges::range of unsigned integers
/// \param func a callable with signature `Value (Service&, Args...)` or
/// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
/// to a member function of Service
/// \param args parameters to the callable; will be copied or moved. To pass by reference,
/// use std::ref().
/// \return Future that becomes ready once all calls have completed
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
future<>
invoke_on(R range, Func func, Args... args) noexcept;

/// Invoke a callable on all instances of `Service` and reduce the results using
/// `Reducer`.
///
Expand Down Expand Up @@ -761,7 +803,7 @@ sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args..
static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
"invoke_on_all()'s func must return void or future<>");
try {
return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
return std::apply([&service, &func] (Args&&... args) mutable {
return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
}, std::move(args));
Expand All @@ -788,6 +830,48 @@ sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Arg
}
}

template <typename Service>
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
inline
future<>
sharded<Service>::invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept {
try {
auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
// Avoid false-positive unused-lambda-capture warning on Clang
(void)args;
return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
});
return parallel_for_each(range, [this, options, func = std::move(func_futurized)] (unsigned s) {
if (s > smp::count - 1) {
throw std::invalid_argument(format("Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count));
}
return smp::submit_to(s, options, [this, func] {
return func(*get_local_service());
});
});
} catch(...) {
return current_exception_as_future();
}
}

template <typename Service>
template <typename R, typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
&& internal::unsigned_range<R>
inline
future<>
sharded<Service>::invoke_on(R range, Func func, Args... args) noexcept {
try {
return invoke_on(std::forward<R>(range), smp_submit_to_options{}, std::move(func), std::move(args)...);
} catch(...) {
return current_exception_as_future();
}
}

template <typename Service>
const Service& sharded<Service>::local() const noexcept {
assert(local_is_initialized());
Expand Down
1 change: 1 addition & 0 deletions src/seastar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ module;
#include <optional>
#include <queue>
#include <random>
#include <ranges>
#include <regex>
#include <source_location>
#include <sstream>
Expand Down
81 changes: 81 additions & 0 deletions tests/unit/sharded_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

#include <seastar/testing/thread_test_case.hh>

#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>

#include <ranges>

using namespace seastar;

Expand Down Expand Up @@ -221,3 +225,80 @@ SEASTAR_THREAD_TEST_CASE(invoke_on_modifiers) {

srv.stop().get();
}

class coordinator_synced_shard_map : public peering_sharded_service<coordinator_synced_shard_map> {
std::vector<unsigned> unsigned_per_shard;
unsigned coordinator_id;

public:
coordinator_synced_shard_map(unsigned coordinator_id) : unsigned_per_shard(smp::count), coordinator_id(coordinator_id) {}

future<> sync(unsigned value) {
return container().invoke_on(coordinator_id, [shard_id = this_shard_id(), value] (coordinator_synced_shard_map& s) {
s.unsigned_per_shard[shard_id] = value;
});
}

unsigned get_synced(int shard_id) {
assert(this_shard_id() == coordinator_id);
return unsigned_per_shard[shard_id];
}
};

SEASTAR_THREAD_TEST_CASE(invoke_on_range_contiguous) {
sharded<coordinator_synced_shard_map> s;
auto coordinator_id = this_shard_id();
s.start(coordinator_id).get();

auto mid = smp::count / 2;
auto half1 = std::views::iota(0u, mid);
auto half1_id = 1;
auto half2 = std::views::iota(mid, smp::count);
auto half2_id = 2;

auto f1 = s.invoke_on(half1, [half1_id] (coordinator_synced_shard_map& s) { return s.sync(half1_id); });
auto f2 = s.invoke_on(half2, [half2_id] (coordinator_synced_shard_map& s) { return s.sync(half2_id); });
f1.get();
f2.get();

auto f3 = s.invoke_on(coordinator_id, [mid, half1_id, half2_id] (coordinator_synced_shard_map& s) {
for (unsigned i = 0; i < mid; ++i) {
BOOST_REQUIRE_EQUAL(half1_id, s.get_synced(i));
}
for (unsigned i = mid; i < smp::count; ++i) {
BOOST_REQUIRE_EQUAL(half2_id, s.get_synced(i));
}
});
f3.get();

s.stop().get();
}

SEASTAR_THREAD_TEST_CASE(invoke_on_range_fragmented) {
sharded<coordinator_synced_shard_map> s;
auto coordinator_id = this_shard_id();
s.start(coordinator_id).get();

// TODO: migrate to C++23 std::views::stride
auto even = std::views::iota(0u, smp::count) | std::views::filter([](int i) { return i % 2 == 0; });
auto even_id = 1;
auto odd = std::views::iota(1u, smp::count) | std::views::filter([](int i) { return i % 2 == 1; });
auto odd_id = 2;

auto f1 = s.invoke_on(even, [even_id] (coordinator_synced_shard_map& s) { return s.sync(even_id); });
auto f2 = s.invoke_on(odd, [odd_id] (coordinator_synced_shard_map& s) { return s.sync(odd_id); });
f1.get();
f2.get();

auto f3 = s.invoke_on(coordinator_id, [even_id, odd_id] (coordinator_synced_shard_map& s) {
for (unsigned i = 0; i < smp::count; i += 2) {
BOOST_REQUIRE_EQUAL(even_id, s.get_synced(i));
}
for (unsigned i = 1; i < smp::count; i += 2) {
BOOST_REQUIRE_EQUAL(odd_id, s.get_synced(i));
}
});
f3.get();

s.stop().get();
}

0 comments on commit b571539

Please sign in to comment.