From 0a84c804a7cfd46cb34258836fad5462f54f81dc Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Mon, 10 Jun 2024 10:40:38 +0300 Subject: [PATCH] io_queue: Distribute IO request among shards Scatter the IO so that every 16MB chunk gets served from the same shard. This should keep IO locality as well as provide good parallelism even for 1Gb files on up to 64 shards. Signed-off-by: Pavel Emelyanov --- include/seastar/core/io_queue.hh | 4 ++++ src/core/io_queue.cc | 37 +++++++++++++++++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh index cddbe84b123..5b2fc68bbc1 100644 --- a/include/seastar/core/io_queue.hh +++ b/include/seastar/core/io_queue.hh @@ -105,9 +105,12 @@ private: friend struct ::io_queue_for_tests; friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream); + unsigned get_request_target(const internal::io_request&) const noexcept; + priority_class_data& find_or_create_class(internal::priority_class pc); future queue_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept; future queue_one_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept; + future queue_one_request_locally(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept; // The fields below are going away, they are just here so we can implement deprecated // functions that used to be provided by the fair_queue and are going away (from both @@ -157,6 +160,7 @@ public: unsigned flow_ratio_ticks = 100; double flow_ratio_ema_factor = 0.95; double flow_ratio_backpressure_threshold = 1.1; + unsigned request_fanout_block_bits = 24; // 16MB }; io_queue(io_group_ptr group, internal::io_sink& sink); diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index cd4b838df94..3c6bd467ce8 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -941,7 +941,7 @@ io_queue::request_limits io_queue::get_request_limits() const noexcept { return l; } -future io_queue::queue_one_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { +future io_queue::queue_one_request_locally(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { return futurize_invoke([pc = std::move(pc), dnl = std::move(dnl), req = std::move(req), this, intent, iovs = std::move(iovs)] () mutable { // First time will hit here, and then we create the class. It is important // that we create the shared pointer in the same shard it will be used at later. @@ -962,6 +962,41 @@ future io_queue::queue_one_request(internal::priority_class pc, io_direc }); } +unsigned io_queue::get_request_target(const internal::io_request& req) const noexcept { + unsigned pos = this_shard_id(); + + switch (req.opcode()) { + case internal::io_request::operation::read: + pos = req.as().pos; + break; + case internal::io_request::operation::write: + pos = req.as().pos; + break; + case internal::io_request::operation::readv: + pos = req.as().pos; + break; + case internal::io_request::operation::writev: + pos = req.as().pos; + break; + default: + // cannot happen, but let's stay on the safe side + break; + } + + return (pos >> get_config().request_fanout_block_bits) % smp::count; +} + +future io_queue::queue_one_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { + unsigned target = get_request_target(req); + if (target == this_shard_id()) { + return queue_one_request_locally(std::move(pc), std::move(dnl), std::move(req), intent, std::move(iovs)); + } + + return smp::submit_to(target, [pc = std::move(pc), dnl = std::move(dnl), req = std::move(req), iovs = std::move(iovs), g = _group] () mutable { + return g->_io_queues[this_shard_id()]->queue_one_request_locally(std::move(pc), std::move(dnl), std::move(req), nullptr, std::move(iovs)); + }); +} + future io_queue::queue_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { size_t max_length = _group->_max_request_length[dnl.rw_idx()];