Skip to content

Commit

Permalink
fair_queue: Introduce group-wide capacity balancing
Browse files Browse the repository at this point in the history
On each shard classes compete with each other by accumulating the sum of
request costs that had been dispatched from them so far. Cost is the
request capacity divided by the class shares. Dispatch loop then selects
the class with the smallest accumulated value, thus providing
shares-aware fairless -- the larger the shares value is, the slower the
accumulator gorws, the more requests are picked from the class for
dispatch.

This patch implements similar approach across shards. For that, each
shard accumnulates the dispatched cost from all classes. IO group keeps
track of a vector of accumulated costs for each shard. When a shard
wants to dispatch it first checks if it has run too far ahead of all
other shards, and if it does, it skips the dispatch loop.

Corner case -- when a queue gets drained, it "withdraws" itself from
other shards' decisions by advancing its group counter to infinity.
Respectively, when a group comes back it may forward its accumulator not
to get too large advantage over other shards.

When scheduling classes, shard has exclusive access to them and uses
log-complex heap to pick the one with smallest consumption counter.
Cross-shard balancing cannot afford it. Instead, each shard manipulates
its own counter only, and to compare it with other shards' it scans the
whole vector, which is not very cache-friendly and race-prone.

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Jun 13, 2024
1 parent 5d245d6 commit b47a932
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
25 changes: 25 additions & 0 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,20 @@ private:
token_bucket_t _token_bucket;
const capacity_t _per_tick_threshold;

// Capacities accumulated by queues in this group. Each queue tries not
// to run too far ahead of the others, if it does -- it skips dispatch
// loop until next tick in the hope that other shards would grab the
// unused disk capacity and will move their counters forward.
std::vector<capacity_t> _balance;

public:

// Maximum value the _balance entry can get
// It's also set when a queue goes idle and doesn't need to participate
// in accumulated races. This value is still suitable for comparisons
// of "active" queues
static constexpr capacity_t max_balance = std::numeric_limits<signed_capacity_t>::max();

// Convert internal capacity value back into the real token
static double capacity_tokens(capacity_t cap) noexcept {
return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
Expand Down Expand Up @@ -267,6 +279,10 @@ public:
}

const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }

capacity_t current_balance() const noexcept;
void update_balance(capacity_t) noexcept;
void reset_balance() noexcept;
};

/// \brief Fair queuing class
Expand Down Expand Up @@ -332,6 +348,15 @@ private:
std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
size_t _nr_classes = 0;
capacity_t _last_accumulated = 0;
capacity_t _total_accumulated = 0;
// Maximum capacity that a queue can stay behind other shards
//
// This is similar to priority classes fall-back deviation and it's
// calculated as the number of capacity points a group with 1 share
// accumulates over tau
//
// Check max_deviation math in push_priority_class_from_idle())
const capacity_t _max_imbalance;

/*
* When the shared capacity os over the local queue delays
Expand Down
41 changes: 41 additions & 0 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ module seastar;

namespace seastar {

logger fq_log("fair_queue");

static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size");
static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size");
Expand Down Expand Up @@ -108,6 +110,7 @@ fair_group::fair_group(config cfg, unsigned nr_queues)
tokens_capacity(cfg.min_tokens)
)
, _per_tick_threshold(_token_bucket.limit() / nr_queues)
, _balance(smp::count, max_balance)
{
if (tokens_capacity(cfg.min_tokens) > _token_bucket.threshold()) {
throw std::runtime_error("Fair-group replenisher limit is lower than threshold");
Expand Down Expand Up @@ -137,6 +140,22 @@ auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity
return _token_bucket.deficiency(from);
}

auto fair_group::current_balance() const noexcept -> capacity_t {
return *std::min_element(_balance.begin(), _balance.end());
}

void fair_group::update_balance(capacity_t acc) noexcept {
_balance[this_shard_id()] = acc;
}

void fair_group::reset_balance() noexcept {
// Request cost can be up to few millions. Given 100K iops disk and a
// class with 1 share, the 64-bit accumulating counter would overflows
// once in few months. Not extremely comfortable, but is still worth
// keeping it in mind
on_internal_error_noexcept(fq_log, "cannot reset group balance");
}

// Priority class, to be used with a given fair_queue
class fair_queue::priority_class_data {
friend class fair_queue;
Expand Down Expand Up @@ -165,7 +184,11 @@ fair_queue::fair_queue(fair_group& group, config cfg)
: _config(std::move(cfg))
, _group(group)
, _group_replenish(clock_type::now())
, _max_imbalance(fair_group::fixed_point_factor * fair_group::token_bucket_t::rate_cast(_config.tau).count())
{
if (fair_group::max_balance > std::numeric_limits<capacity_t>::max() - _max_imbalance) {
throw std::runtime_error("Too large tau parameter");
}
}

fair_queue::~fair_queue() {
Expand Down Expand Up @@ -194,6 +217,12 @@ void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept
// over signed maximum (see overflow check below)
pc._accumulated = std::max<signed_capacity_t>(_last_accumulated - max_deviation, pc._accumulated);
_handles.assert_enough_capacity();
if (_handles.empty()) {
capacity_t balance = _group.current_balance();
if (balance != fair_group::max_balance) {
_total_accumulated = std::max<signed_capacity_t>(balance - _max_imbalance, _total_accumulated);
}
}
_handles.push(&pc);
pc._queued = true;
}
Expand Down Expand Up @@ -334,6 +363,11 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
capacity_t dispatched = 0;
boost::container::small_vector<priority_class_ptr, 2> preempt;

capacity_t balance = _group.current_balance();
if (_total_accumulated > balance + _max_imbalance) {
return;
}

while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) {
priority_class_data& h = *_handles.top();
if (h._queue.empty() || !h._plugged) {
Expand Down Expand Up @@ -376,6 +410,7 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
}
_last_accumulated = 0;
}
_total_accumulated += req_cost;
h._accumulated += req_cost;
h._pure_accumulated += req_cap;
dispatched += req_cap;
Expand All @@ -390,6 +425,12 @@ void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
for (auto&& h : preempt) {
push_priority_class(*h);
}

if (_total_accumulated >= fair_group::max_balance) {
_group.reset_balance();
_total_accumulated = 0;
}
_group.update_balance(_handles.empty() ? fair_group::max_balance : _total_accumulated);
}

std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(class_id c) {
Expand Down

0 comments on commit b47a932

Please sign in to comment.