Skip to content

Commit

Permalink
fair_queue: Rename fair_group to throttle
Browse files Browse the repository at this point in the history
The class in question only controls the output flow of capacities, it's
not about fair queueing at all. There is an effort to make cross-shard
fairness, that needs fair_group however, but we're not yet there.

refs: scylladb#2294

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Jul 5, 2024
1 parent 497f4dd commit 33e1356
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 35 deletions.
2 changes: 1 addition & 1 deletion apps/io_tester/ioinfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ int main(int ac, char** av) {
}
out << YAML::EndMap;

const auto& fg = internal::get_fair_group(ioq, internal::io_direction_and_length::write_idx);
const auto& fg = internal::io_throttle(ioq, internal::io_direction_and_length::write_idx);
out << YAML::Key << "per_tick_grab_threshold" << YAML::Value << fg.per_tick_grab_threshold();

const auto& tb = fg.token_bucket();
Expand Down
12 changes: 6 additions & 6 deletions include/seastar/core/fair_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public:
/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot
/// cope with the number of arriving requests, or the total size of the data withing
/// the given time frame exceeds the disk throughput.
class fair_group {
class shared_throttle {
public:
using capacity_t = fair_queue_entry::capacity_t;
using clock_type = std::chrono::steady_clock;
Expand Down Expand Up @@ -247,8 +247,8 @@ public:
std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
};

explicit fair_group(config cfg, unsigned nr_queues);
fair_group(fair_group&&) = delete;
explicit shared_throttle(config cfg, unsigned nr_queues);
shared_throttle(shared_throttle&&) = delete;

capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; }
Expand Down Expand Up @@ -299,7 +299,7 @@ public:

using class_id = unsigned int;
class priority_class_data;
using capacity_t = fair_group::capacity_t;
using capacity_t = shared_throttle::capacity_t;
using signed_capacity_t = std::make_signed_t<capacity_t>;

private:
Expand All @@ -322,7 +322,7 @@ private:
};

config _config;
fair_group& _group;
shared_throttle& _group;
clock_type::time_point _group_replenish;
fair_queue_ticket _resources_executing;
fair_queue_ticket _resources_queued;
Expand Down Expand Up @@ -364,7 +364,7 @@ public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
/// \param cfg an instance of the class \ref config
explicit fair_queue(fair_group& shared, config cfg);
explicit fair_queue(shared_throttle& shared, config cfg);
fair_queue(fair_queue&&) = delete;
~fair_queue();

Expand Down
10 changes: 5 additions & 5 deletions include/seastar/core/io_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace seastar {

class io_queue;
namespace internal {
const fair_group& get_fair_group(const io_queue& ioq, unsigned stream);
const shared_throttle& io_throttle(const io_queue& ioq, unsigned stream);
}

#if SEASTAR_API_LEVEL < 7
Expand Down Expand Up @@ -97,7 +97,7 @@ private:
internal::io_sink& _sink;

friend struct ::io_queue_for_tests;
friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream);
friend const shared_throttle& internal::io_throttle(const io_queue& ioq, unsigned stream);

priority_class_data& find_or_create_class(internal::priority_class pc);
future<size_t> queue_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept;
Expand Down Expand Up @@ -218,16 +218,16 @@ public:
private:
friend class io_queue;
friend struct ::io_queue_for_tests;
friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream);
friend const shared_throttle& internal::io_throttle(const io_queue& ioq, unsigned stream);

const io_queue::config _config;
size_t _max_request_length[2];
boost::container::static_vector<fair_group, 2> _fgs;
boost::container::static_vector<shared_throttle, 2> _fgs;
std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
util::spinlock _lock;
const shard_id _allocated_on;

static fair_group::config make_fair_group_config(const io_queue::config& qcfg) noexcept;
static shared_throttle::config make_throttle_config(const io_queue::config& qcfg) noexcept;
priority_class_data& find_or_create_class(internal::priority_class pc);
};

Expand Down
18 changes: 9 additions & 9 deletions src/core/fair_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_que
std::max<int32_t>(a._size - b._size, 0));
}

fair_group::fair_group(config cfg, unsigned nr_queues)
shared_throttle::shared_throttle(config cfg, unsigned nr_queues)
: _token_bucket(fixed_point_factor,
std::max<capacity_t>(fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), tokens_capacity(cfg.limit_min_tokens)),
tokens_capacity(cfg.min_tokens)
Expand All @@ -114,16 +114,16 @@ fair_group::fair_group(config cfg, unsigned nr_queues)
}
}

auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t {
auto shared_throttle::grab_capacity(capacity_t cap) noexcept -> capacity_t {
assert(cap <= _token_bucket.limit());
return _token_bucket.grab(cap);
}

void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
void shared_throttle::replenish_capacity(clock_type::time_point now) noexcept {
_token_bucket.replenish(now);
}

void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
void shared_throttle::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
auto now = clock_type::now();
auto extra = _token_bucket.accumulated_in(now - local_ts);

Expand All @@ -133,7 +133,7 @@ void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noex
}
}

auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
auto shared_throttle::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
return _token_bucket.deficiency(from);
}

Expand Down Expand Up @@ -161,7 +161,7 @@ bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const
return lhs->_accumulated > rhs->_accumulated;
}

fair_queue::fair_queue(fair_group& group, config cfg)
fair_queue::fair_queue(shared_throttle& group, config cfg)
: _config(std::move(cfg))
, _group(group)
, _group_replenish(clock_type::now())
Expand All @@ -187,7 +187,7 @@ void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept
// duration. For this estimate how many capacity units can be
// accumulated with the current class shares per rate resulution
// and scale it up to tau.
capacity_t max_deviation = fair_group::fixed_point_factor / pc._shares * fair_group::token_bucket_t::rate_cast(_config.tau).count();
capacity_t max_deviation = shared_throttle::fixed_point_factor / pc._shares * shared_throttle::token_bucket_t::rate_cast(_config.tau).count();
// On start this deviation can go to negative values, so not to
// introduce extra if's for that short corner case, use signed
// arithmetics and make sure the _accumulated value doesn't grow
Expand Down Expand Up @@ -397,10 +397,10 @@ std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(
priority_class_data& pc = *_priority_classes[c];
return std::vector<sm::impl::metric_definition_impl>({
sm::make_counter("consumption",
[&pc] { return fair_group::capacity_tokens(pc._pure_accumulated); },
[&pc] { return shared_throttle::capacity_tokens(pc._pure_accumulated); },
sm::description("Accumulated disk capacity units consumed by this class; an increment per-second rate indicates full utilization")),
sm::make_counter("adjusted_consumption",
[&pc] { return fair_group::capacity_tokens(pc._accumulated); },
[&pc] { return shared_throttle::capacity_tokens(pc._accumulated); },
sm::description("Consumed disk capacity units adjusted for class shares and idling preemption")),
});
}
Expand Down
8 changes: 4 additions & 4 deletions src/core/io_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ sstring io_request::opname() const {
std::abort();
}

const fair_group& get_fair_group(const io_queue& ioq, unsigned stream) {
const shared_throttle& io_throttle(const io_queue& ioq, unsigned stream) {
return ioq._group->_fgs[stream];
}

Expand Down Expand Up @@ -588,8 +588,8 @@ io_queue::io_queue(io_group_ptr group, internal::io_sink& sink)
});
}

fair_group::config io_group::make_fair_group_config(const io_queue::config& qcfg) noexcept {
fair_group::config cfg;
shared_throttle::config io_group::make_throttle_config(const io_queue::config& qcfg) noexcept {
shared_throttle::config cfg;
cfg.label = fmt::format("io-queue-{}", qcfg.devid);
double min_weight = std::min(io_queue::read_request_base_count, qcfg.disk_req_write_to_read_multiplier);
double min_size = std::min(io_queue::read_request_base_count, qcfg.disk_blocks_write_to_read_multiplier);
Expand All @@ -609,7 +609,7 @@ io_group::io_group(io_queue::config io_cfg, unsigned nr_queues)
: _config(std::move(io_cfg))
, _allocated_on(this_shard_id())
{
auto fg_cfg = make_fair_group_config(_config);
auto fg_cfg = make_throttle_config(_config);
_fgs.emplace_back(fg_cfg, nr_queues);
if (_config.duplex) {
_fgs.emplace_back(fg_cfg, nr_queues);
Expand Down
14 changes: 7 additions & 7 deletions tests/perf/fair_queue_perf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@
static constexpr fair_queue::class_id cid = 0;

struct local_fq_and_class {
seastar::fair_group fg;
seastar::shared_throttle fg;
seastar::fair_queue fq;
seastar::fair_queue sfq;
unsigned executed = 0;

static fair_group::config fg_config() {
fair_group::config cfg;
static shared_throttle::config fg_config() {
shared_throttle::config cfg;
return cfg;
}

seastar::fair_queue& queue(bool local) noexcept { return local ? fq : sfq; }

local_fq_and_class(seastar::fair_group& sfg)
local_fq_and_class(seastar::shared_throttle& sfg)
: fg(fg_config(), 1)
, fq(fg, seastar::fair_queue::config())
, sfq(sfg, seastar::fair_queue::config())
Expand Down Expand Up @@ -75,10 +75,10 @@ struct perf_fair_queue {

seastar::sharded<local_fq_and_class> local_fq;

seastar::fair_group shared_fg;
seastar::shared_throttle shared_fg;

static fair_group::config fg_config() {
fair_group::config cfg;
static shared_throttle::config fg_config() {
shared_throttle::config cfg;
return cfg;
}

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/fair_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ struct request {
};

class test_env {
fair_group _fg;
shared_throttle _fg;
fair_queue _fq;
std::vector<int> _results;
std::vector<std::vector<std::exception_ptr>> _exceptions;
fair_queue::class_id _nr_classes = 0;
std::vector<request> _inflight;

static fair_group::config fg_config(unsigned cap) {
fair_group::config cfg;
static shared_throttle::config fg_config(unsigned cap) {
shared_throttle::config cfg;
cfg.rate_limit_duration = std::chrono::microseconds(cap);
return cfg;
}
Expand Down

0 comments on commit 33e1356

Please sign in to comment.