Skip to content

Commit

Permalink
sharded.hh: add invoke_on variant for a shard range
Browse files Browse the repository at this point in the history
  • Loading branch information
tomershafir committed Sep 22, 2024
1 parent 1147ac2 commit 6ce5244
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 2 deletions.
83 changes: 81 additions & 2 deletions include/seastar/core/sharded.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <boost/iterator/counting_iterator.hpp>
#include <concepts>
#include <functional>
#include <ranges>
#include <type_traits>
#endif

/// \defgroup smp-module Multicore
Expand Down Expand Up @@ -190,7 +192,7 @@ class sharded {
};
std::vector<entry> _instances;
private:
using invoke_on_all_func_type = std::function<future<> (Service&)>;
using invoke_on_multiple_func_type = std::function<future<> (Service&)>;
private:
template <typename U, bool async>
friend struct shared_ptr_make_helper;
Expand Down Expand Up @@ -338,6 +340,42 @@ public:
}
}

/// Invoke a callable on a range of instances of `Service`.
///
/// \param begin shard id that begins the range (inclusive)
/// \param end shard id that ends the range (inclusive)
/// \param options the options to forward to the \ref smp::submit_to()
/// called behind the scenes.
/// \param func a callable with signature `Value (Service&, Args...)` or
/// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
/// to a member function of Service
/// \param args parameters to the callable; will be copied or moved. To pass by reference,
/// use std::ref().
/// \return Future that becomes ready once all calls have completed
template <typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
future<>
invoke_on(unsigned begin, unsigned end, smp_submit_to_options options, Func func, Args... args) noexcept;

/// Invoke a callable on a range of instances of `Service`.
/// Passes the default \ref smp_submit_to_options to the
/// \ref smp::submit_to() called behind the scenes.
///
/// \param begin shard id that begins the range (inclusive)
/// \param end shard id that ends the range (inclusive)
/// \param func a callable with signature `Value (Service&, Args...)` or
/// `future<Value> (Service&, Args...)` (for some `Value` type), or a pointer
/// to a member function of Service
/// \param args parameters to the callable; will be copied or moved. To pass by reference,
/// use std::ref().
/// \return Future that becomes ready once all calls have completed
template <typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
future<>
invoke_on(unsigned begin, unsigned end, Func func, Args... args) noexcept;

/// Invoke a callable on all instances of `Service` and reduce the results using
/// `Reducer`.
///
Expand Down Expand Up @@ -761,7 +799,7 @@ sharded<Service>::invoke_on_all(smp_submit_to_options options, Func func, Args..
static_assert(std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>,
"invoke_on_all()'s func must return void or future<>");
try {
return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
return std::apply([&service, &func] (Args&&... args) mutable {
return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
}, std::move(args));
Expand All @@ -788,6 +826,47 @@ sharded<Service>::invoke_on_others(smp_submit_to_options options, Func func, Arg
}
}

template <typename Service>
template <typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
inline
future<>
sharded<Service>::invoke_on(unsigned begin, unsigned end, smp_submit_to_options options, Func func, Args... args) noexcept {
try {
if (begin > end || end > seastar::smp::count - 1) {
throw std::invalid_argument(format("Invalid range: [{},{}]. Begin must not be greater than end, and end cannot be greater than the largest shard number", begin, end));
}
auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable {
// Avoid false-positive unused-lambda-capture warning on Clang
(void)args;
return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward<Args>(args))...)));
});
auto range = std::views::iota(begin, end + 1/*exclusive*/);
return parallel_for_each(range, [this, options, func = std::move(func_futurized)] (unsigned s) {
return smp::submit_to(s, options, [this, func] {
return func(*get_local_service());
});
});
} catch(...) {
return current_exception_as_future();
}
}

template <typename Service>
template <typename Func, typename... Args>
requires std::invocable<Func, Service&, Args...>
&& std::is_same_v<futurize_t<std::invoke_result_t<Func, Service&, internal::sharded_unwrap_t<Args>...>>, future<>>
inline
future<>
sharded<Service>::invoke_on(unsigned begin, unsigned end, Func func, Args... args) noexcept {
try {
return invoke_on(begin, end, smp_submit_to_options{}, std::move(func), std::move(args)...);
} catch(...) {
return current_exception_as_future();
}
}

template <typename Service>
const Service& sharded<Service>::local() const noexcept {
assert(local_is_initialized());
Expand Down
1 change: 1 addition & 0 deletions src/seastar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ module;
#include <optional>
#include <queue>
#include <random>
#include <ranges>
#include <regex>
#include <source_location>
#include <sstream>
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/sharded_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <seastar/testing/thread_test_case.hh>

#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>

using namespace seastar;

Expand Down Expand Up @@ -221,3 +222,37 @@ SEASTAR_THREAD_TEST_CASE(invoke_on_modifiers) {

srv.stop().get();
}

class int_reader {
int data = 1024;
public:
future<int> read() {
return make_ready_future<int>(data);
}
};

class int_writer {
int data;
public:
future<> write(int data) {
this->data = data;
return make_ready_future<>();
}
};

SEASTAR_THREAD_TEST_CASE(invoke_on_range) {
seastar::sharded<int_reader> sharded_reader;
seastar::sharded<int_writer> sharded_writer;

sharded_reader.start().get();
sharded_writer.start().get();

int mid = seastar::smp::count / 2;
auto f1 = sharded_reader.invoke_on(0, std::max(mid - 1, 0), [] (int_reader& s) { (void)s.read().get(); });
auto f2 = sharded_writer.invoke_on(mid, seastar::smp::count - 1, [] (int_writer& s) { s.write(1024).get(); });
f1.get();
f2.get();

sharded_reader.stop().get();
sharded_writer.stop().get();
}

0 comments on commit 6ce5244

Please sign in to comment.