diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh index cddbe84b123..5b2fc68bbc1 100644 --- a/include/seastar/core/io_queue.hh +++ b/include/seastar/core/io_queue.hh @@ -105,9 +105,12 @@ private: friend struct ::io_queue_for_tests; friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream); + unsigned get_request_target(const internal::io_request&) const noexcept; + priority_class_data& find_or_create_class(internal::priority_class pc); future queue_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept; future queue_one_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept; + future queue_one_request_locally(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept; // The fields below are going away, they are just here so we can implement deprecated // functions that used to be provided by the fair_queue and are going away (from both @@ -157,6 +160,7 @@ public: unsigned flow_ratio_ticks = 100; double flow_ratio_ema_factor = 0.95; double flow_ratio_backpressure_threshold = 1.1; + unsigned request_fanout_block_bits = 24; // 16MB }; io_queue(io_group_ptr group, internal::io_sink& sink); diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index cd4b838df94..3c6bd467ce8 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -941,7 +941,7 @@ io_queue::request_limits io_queue::get_request_limits() const noexcept { return l; } -future io_queue::queue_one_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { +future io_queue::queue_one_request_locally(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { return futurize_invoke([pc = std::move(pc), dnl = std::move(dnl), req = std::move(req), this, intent, iovs = std::move(iovs)] () mutable { // First time will hit here, and then we create the class. It is important // that we create the shared pointer in the same shard it will be used at later. @@ -962,6 +962,41 @@ future io_queue::queue_one_request(internal::priority_class pc, io_direc }); } +unsigned io_queue::get_request_target(const internal::io_request& req) const noexcept { + unsigned pos = this_shard_id(); + + switch (req.opcode()) { + case internal::io_request::operation::read: + pos = req.as().pos; + break; + case internal::io_request::operation::write: + pos = req.as().pos; + break; + case internal::io_request::operation::readv: + pos = req.as().pos; + break; + case internal::io_request::operation::writev: + pos = req.as().pos; + break; + default: + // cannot happen, but let's stay on the safe side + break; + } + + return (pos >> get_config().request_fanout_block_bits) % smp::count; +} + +future io_queue::queue_one_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { + unsigned target = get_request_target(req); + if (target == this_shard_id()) { + return queue_one_request_locally(std::move(pc), std::move(dnl), std::move(req), intent, std::move(iovs)); + } + + return smp::submit_to(target, [pc = std::move(pc), dnl = std::move(dnl), req = std::move(req), iovs = std::move(iovs), g = _group] () mutable { + return g->_io_queues[this_shard_id()]->queue_one_request_locally(std::move(pc), std::move(dnl), std::move(req), nullptr, std::move(iovs)); + }); +} + future io_queue::queue_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept { size_t max_length = _group->_max_request_length[dnl.rw_idx()];