Skip to content

Commit

Permalink
io_queue: Distribute IO request among shards
Browse files Browse the repository at this point in the history
Scatter the IO so that every 16MB chunk gets served from the same shard.
This should keep IO locality as well as provide good parallelism even
for 1Gb files on up to 64 shards.

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Jun 10, 2024
1 parent fc266de commit 0a84c80
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
4 changes: 4 additions & 0 deletions include/seastar/core/io_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ private:
friend struct ::io_queue_for_tests;
friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream);

unsigned get_request_target(const internal::io_request&) const noexcept;

priority_class_data& find_or_create_class(internal::priority_class pc);
future<size_t> queue_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept;
future<size_t> queue_one_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept;
future<size_t> queue_one_request_locally(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept;

// The fields below are going away, they are just here so we can implement deprecated
// functions that used to be provided by the fair_queue and are going away (from both
Expand Down Expand Up @@ -157,6 +160,7 @@ public:
unsigned flow_ratio_ticks = 100;
double flow_ratio_ema_factor = 0.95;
double flow_ratio_backpressure_threshold = 1.1;
unsigned request_fanout_block_bits = 24; // 16MB
};

io_queue(io_group_ptr group, internal::io_sink& sink);
Expand Down
37 changes: 36 additions & 1 deletion src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ io_queue::request_limits io_queue::get_request_limits() const noexcept {
return l;
}

future<size_t> io_queue::queue_one_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept {
future<size_t> io_queue::queue_one_request_locally(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept {
return futurize_invoke([pc = std::move(pc), dnl = std::move(dnl), req = std::move(req), this, intent, iovs = std::move(iovs)] () mutable {
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
Expand All @@ -962,6 +962,41 @@ future<size_t> io_queue::queue_one_request(internal::priority_class pc, io_direc
});
}

unsigned io_queue::get_request_target(const internal::io_request& req) const noexcept {
unsigned pos = this_shard_id();

switch (req.opcode()) {
case internal::io_request::operation::read:
pos = req.as<internal::io_request::operation::read>().pos;
break;
case internal::io_request::operation::write:
pos = req.as<internal::io_request::operation::write>().pos;
break;
case internal::io_request::operation::readv:
pos = req.as<internal::io_request::operation::readv>().pos;
break;
case internal::io_request::operation::writev:
pos = req.as<internal::io_request::operation::writev>().pos;
break;
default:
// cannot happen, but let's stay on the safe side
break;
}

return (pos >> get_config().request_fanout_block_bits) % smp::count;
}

future<size_t> io_queue::queue_one_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept {
unsigned target = get_request_target(req);
if (target == this_shard_id()) {
return queue_one_request_locally(std::move(pc), std::move(dnl), std::move(req), intent, std::move(iovs));
}

return smp::submit_to(target, [pc = std::move(pc), dnl = std::move(dnl), req = std::move(req), iovs = std::move(iovs), g = _group] () mutable {
return g->_io_queues[this_shard_id()]->queue_one_request_locally(std::move(pc), std::move(dnl), std::move(req), nullptr, std::move(iovs));
});
}

future<size_t> io_queue::queue_request(internal::priority_class pc, io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept {
size_t max_length = _group->_max_request_length[dnl.rw_idx()];

Expand Down

0 comments on commit 0a84c80

Please sign in to comment.