From 5cfcc796b9b292f341f2a4036d2558dedb1352e1 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sat, 21 Sep 2024 20:22:12 +0300 Subject: [PATCH] sharded.hh: add invoke_on variant for a shard range --- include/seastar/core/sharded.hh | 83 ++++++++++++++++++++++++++++++++- tests/unit/sharded_test.cc | 35 ++++++++++++++ 2 files changed, 116 insertions(+), 2 deletions(-) diff --git a/include/seastar/core/sharded.hh b/include/seastar/core/sharded.hh index 36a58811abc..f4f41745f5d 100644 --- a/include/seastar/core/sharded.hh +++ b/include/seastar/core/sharded.hh @@ -35,6 +35,8 @@ #include #include #include +#include +#include #endif /// \defgroup smp-module Multicore @@ -190,7 +192,7 @@ class sharded { }; std::vector _instances; private: - using invoke_on_all_func_type = std::function (Service&)>; + using invoke_on_multiple_func_type = std::function (Service&)>; private: template friend struct shared_ptr_make_helper; @@ -338,6 +340,42 @@ public: } } + /// Invoke a callable on a range of instances of `Service`. + /// + /// \param begin shard id that begins the range + /// \param end shard id that ends the range (inclusive) + /// \param options the options to forward to the \ref smp::submit_to() + /// called behind the scenes. + /// \param func a callable with signature `Value (Service&, Args...)` or + /// `future (Service&, Args...)` (for some `Value` type), or a pointer + /// to a member function of Service + /// \param args parameters to the callable; will be copied or moved. To pass by reference, + /// use std::ref(). + /// \return Future that becomes ready once all calls have completed + template + requires std::invocable + && std::is_same_v...>>, future<>> + future<> + invoke_on(unsigned begin, unsigned end, smp_submit_to_options options, Func func, Args... args) noexcept; + + /// Invoke a callable on a range of instances of `Service`. + /// Passes the default \ref smp_submit_to_options to the + /// \ref smp::submit_to() called behind the scenes. + /// + /// \param begin shard id that begins the range + /// \param end shard id that ends the range + /// \param func a callable with signature `Value (Service&, Args...)` or + /// `future (Service&, Args...)` (for some `Value` type), or a pointer + /// to a member function of Service + /// \param args parameters to the callable; will be copied or moved. To pass by reference, + /// use std::ref(). + /// \return Future that becomes ready once all calls have completed + template + requires std::invocable + && std::is_same_v...>>, future<>> + future<> + invoke_on(unsigned begin, unsigned end, Func func, Args... args) noexcept; + /// Invoke a callable on all instances of `Service` and reduce the results using /// `Reducer`. /// @@ -761,7 +799,7 @@ sharded::invoke_on_all(smp_submit_to_options options, Func func, Args.. static_assert(std::is_same_v...>>, future<>>, "invoke_on_all()'s func must return void or future<>"); try { - return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable { + return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable { return std::apply([&service, &func] (Args&&... args) mutable { return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward(args))...))); }, std::move(args)); @@ -788,6 +826,47 @@ sharded::invoke_on_others(smp_submit_to_options options, Func func, Arg } } +template +template +requires std::invocable + && std::is_same_v...>>, future<>> +inline +future<> +sharded::invoke_on(unsigned begin, unsigned end, smp_submit_to_options options, Func func, Args... args) noexcept { + try { + if (begin > end || end > seastar::smp::count - 1) { + throw std::invalid_argument(format("Invalid range: [{},{}]. Begin must not be greater than end, and end cannot be greater than the largest shard number", begin, end)); + } + auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable { + // Avoid false-positive unused-lambda-capture warning on Clang + (void)args; + return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward(args))...))); + }); + auto range = std::ranges::views::iota(begin, end + 1/*exclusive*/); + return parallel_for_each(range, [this, options, func = std::move(func_futurized)] (unsigned s) { + return smp::submit_to(s, options, [this, func] { + return func(*get_local_service()); + }); + }); + } catch(...) { + return current_exception_as_future(); + } +} + +template +template +requires std::invocable + && std::is_same_v...>>, future<>> +inline +future<> +sharded::invoke_on(unsigned begin, unsigned end, Func func, Args... args) noexcept { + try { + return invoke_on(begin, end, smp_submit_to_options{}, std::move(func), std::move(args)...); + } catch(...) { + return current_exception_as_future(); + } +} + template const Service& sharded::local() const noexcept { assert(local_is_initialized()); diff --git a/tests/unit/sharded_test.cc b/tests/unit/sharded_test.cc index 3e58036bf9a..ca36793cd23 100644 --- a/tests/unit/sharded_test.cc +++ b/tests/unit/sharded_test.cc @@ -22,6 +22,7 @@ #include #include +#include using namespace seastar; @@ -221,3 +222,37 @@ SEASTAR_THREAD_TEST_CASE(invoke_on_modifiers) { srv.stop().get(); } + +class int_reader { + int data = 1024; +public: + future read() { + return make_ready_future(data); + } +}; + +class int_writer { + int data; +public: + future<> write(int data) { + this->data = data; + return make_ready_future<>(); + } +}; + +SEASTAR_THREAD_TEST_CASE(invoke_on_range) { + seastar::sharded sharded_reader; + seastar::sharded sharded_writer; + + sharded_reader.start().get(); + sharded_writer.start().get(); + + int mid = seastar::smp::count / 2; + auto f1 = sharded_reader.invoke_on(0, std::max(mid - 1, 0), [] (int_reader& s) { (void)s.read().get(); }); + auto f2 = sharded_writer.invoke_on(mid, seastar::smp::count - 1, [] (int_writer& s) { s.write(1024).get(); }); + f1.get(); + f2.get(); + + sharded_reader.stop().get(); + sharded_writer.stop().get(); +}