Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move output IO throttler to IO queue level #2332

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ add_library (seastar
src/core/alien.cc
src/core/file.cc
src/core/fair_queue.cc
src/core/throttle.cc
src/core/reactor_backend.cc
src/core/thread_pool.cc
src/core/app-template.cc
Expand Down
2 changes: 1 addition & 1 deletion apps/io_tester/ioinfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ int main(int ac, char** av) {
}
out << YAML::EndMap;

const auto& fg = internal::get_fair_group(ioq, internal::io_direction_and_length::write_idx);
const auto& fg = internal::io_throttle(ioq, internal::io_direction_and_length::write_idx);
out << YAML::Key << "per_tick_grab_threshold" << YAML::Value << fg.per_tick_grab_threshold();

const auto& tb = fg.token_bucket();
Expand Down
196 changes: 10 additions & 186 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/util/shared_token_bucket.hh>
#include <seastar/core/internal/throttle.hh>

#include <chrono>
#include <cstdint>
Expand Down Expand Up @@ -114,6 +114,10 @@ private:
capacity_t _capacity;
bi::slist_member_hook<> _hook;

protected:
~fair_queue_entry() = default;
void cancel() noexcept { _capacity = 0; }

public:
fair_queue_entry(capacity_t c) noexcept
: _capacity(c) {}
Expand All @@ -123,148 +127,8 @@ public:
bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;

capacity_t capacity() const noexcept { return _capacity; }
};

/// \brief Group of queues class
///
/// This is a fair group. It's attached by one or mode fair queues. On machines having the
/// big* amount of shards, queues use the group to borrow/lend the needed capacity for
/// requests dispatching.
///
/// * Big means that when all shards sumbit requests alltogether the disk is unable to
/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot
/// cope with the number of arriving requests, or the total size of the data withing
/// the given time frame exceeds the disk throughput.
class fair_group {
public:
using capacity_t = fair_queue_entry::capacity_t;
using clock_type = std::chrono::steady_clock;

/*
* tldr; The math
*
* Bw, Br -- write/read bandwidth (bytes per second)
* Ow, Or -- write/read iops (ops per second)
*
* xx_max -- their maximum values (configured)
*
* Throttling formula:
*
* Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K
*
* where K is the scalar value <= 1.0 (also configured)
*
* Bandwidth is bytes time derivatite, iops is ops time derivative, i.e.
* Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into
*
* d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K
*
* Fair queue tickets are {w, s} weight-size pairs that are
*
* s = read_base_count * br, for reads
* Br_max/Bw_max * read_base_count * bw, for writes
*
* w = read_base_count, for reads
* Or_max/Ow_max * read_base_count, for writes
*
* Thus the formula turns into
*
* d(sum(w/W + s/S))/dr <= K
*
* where {w, s} is the ticket value if a request and sum summarizes the
* ticket values from all the requests seen so far, {W, S} is the ticket
* value that corresonds to a virtual summary of Or_max requests of
* Br_max size total.
*/

/*
* The normalization results in a float of the 2^-30 seconds order of
* magnitude. Not to invent float point atomic arithmetics, the result
* is converted to an integer by multiplying by a factor that's large
* enough to turn these values into a non-zero integer.
*
* Also, the rates in bytes/sec when adjusted by io-queue according to
* multipliers become too large to be stored in 32-bit ticket value.
* Thus the rate resolution is applied. The t.bucket is configured with a
* time period for which the speeds from F (in above formula) are taken.
*/

static constexpr float fixed_point_factor = float(1 << 24);
using rate_resolution = std::milli;
using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::no>;

private:

/*
* The dF/dt <= K limitation is managed by the modified token bucket
* algo where tokens are ticket.normalize(cost_capacity), the refill
* rate is K.
*
* The token bucket algo must have the limit on the number of tokens
* accumulated. Here it's configured so that it accumulates for the
* latency_goal duration.
*
* The replenish threshold is the minimal number of tokens to put back.
* It's reserved for future use to reduce the load on the replenish
* timestamp.
*
* The timestamp, in turn, is the time when the bucket was replenished
* last. Every time a shard tries to get tokens from bucket it first
* tries to convert the time that had passed since this timestamp
* into more tokens in the bucket.
*/

token_bucket_t _token_bucket;
const capacity_t _per_tick_threshold;

public:

// Convert internal capacity value back into the real token
static double capacity_tokens(capacity_t cap) noexcept {
return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
}

// Convert floating-point tokens into the token bucket capacity
static capacity_t tokens_capacity(double tokens) noexcept {
return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor;
}

auto capacity_duration(capacity_t cap) const noexcept {
return _token_bucket.duration_for(cap);
}

struct config {
sstring label = "";
/*
* There are two "min" values that can be configured. The former one
* is the minimal weight:size pair that the upper layer is going to
* submit. However, it can submit _larger_ values, and the fair queue
* must accept those as large as the latter pair (but it can accept
* even larger values, of course)
*/
double min_tokens = 0.0;
double limit_min_tokens = 0.0;
std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
};

explicit fair_group(config cfg, unsigned nr_queues);
fair_group(fair_group&&) = delete;

capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
capacity_t grab_capacity(capacity_t cap) noexcept;
clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
void replenish_capacity(clock_type::time_point now) noexcept;
void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;

capacity_t capacity_deficiency(capacity_t from) const noexcept;

std::chrono::duration<double> rate_limit_duration() const noexcept {
std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate());
return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
}

const token_bucket_t& token_bucket() const noexcept { return _token_bucket; }
virtual throttle::grab_result can_dispatch() const noexcept = 0;
virtual void dispatch() = 0;
};

/// \brief Fair queuing class
Expand Down Expand Up @@ -299,7 +163,7 @@ public:

using class_id = unsigned int;
class priority_class_data;
using capacity_t = fair_group::capacity_t;
using capacity_t = fair_queue_entry::capacity_t;
using signed_capacity_t = std::make_signed_t<capacity_t>;

private:
Expand All @@ -322,49 +186,24 @@ private:
};

config _config;
fair_group& _group;
clock_type::time_point _group_replenish;
fair_queue_ticket _resources_executing;
fair_queue_ticket _resources_queued;
priority_queue _handles;
std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
size_t _nr_classes = 0;
capacity_t _last_accumulated = 0;

/*
* When the shared capacity os over the local queue delays
* further dispatching untill better times
*
* \head -- the value group head rover is expected to cross
* \cap -- the capacity that's accounted on the group
*
* The last field is needed to "rearm" the wait in case
* queue decides that it wants to dispatch another capacity
* in the middle of the waiting
*/
struct pending {
capacity_t head;
capacity_t cap;

pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {}
};

std::optional<pending> _pending;

void push_priority_class(priority_class_data& pc) noexcept;
void push_priority_class_from_idle(priority_class_data& pc) noexcept;
void pop_priority_class(priority_class_data& pc) noexcept;
void plug_priority_class(priority_class_data& pc) noexcept;
void unplug_priority_class(priority_class_data& pc) noexcept;

enum class grab_result { grabbed, cant_preempt, pending };
grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
/// \param cfg an instance of the class \ref config
explicit fair_queue(fair_group& shared, config cfg);
explicit fair_queue(config cfg);
fair_queue(fair_queue&&) = delete;
~fair_queue();

Expand All @@ -388,14 +227,6 @@ public:
/// \return the amount of resources (weight, size) currently executing
fair_queue_ticket resources_currently_executing() const;

capacity_t tokens_capacity(double tokens) const noexcept {
return _group.tokens_capacity(tokens);
}

capacity_t maximum_capacity() const noexcept {
return _group.maximum_capacity();
}

/// Queue the entry \c ent through this class' \ref fair_queue
///
/// The user of this interface is supposed to call \ref notify_requests_finished when the
Expand All @@ -405,15 +236,8 @@ public:
void plug_class(class_id c) noexcept;
void unplug_class(class_id c) noexcept;

/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept;
void notify_request_cancelled(fair_queue_entry& ent) noexcept;

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests(std::function<void(fair_queue_entry&)> cb);

clock_type::time_point next_pending_aio() const noexcept;
void dispatch_requests();

std::vector<seastar::metrics::impl::metric_definition_impl> metrics(class_id c);
};
Expand Down
Loading
Loading