diff --git a/CMakeLists.txt b/CMakeLists.txt index 9fd47b9fe03..357f388233d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -691,6 +691,7 @@ add_library (seastar src/core/alien.cc src/core/file.cc src/core/fair_queue.cc + src/core/throttle.cc src/core/reactor_backend.cc src/core/thread_pool.cc src/core/app-template.cc diff --git a/apps/io_tester/ioinfo.cc b/apps/io_tester/ioinfo.cc index d23f265e05b..672752b543f 100644 --- a/apps/io_tester/ioinfo.cc +++ b/apps/io_tester/ioinfo.cc @@ -74,7 +74,7 @@ int main(int ac, char** av) { } out << YAML::EndMap; - const auto& fg = internal::get_fair_group(ioq, internal::io_direction_and_length::write_idx); + const auto& fg = internal::io_throttle(ioq, internal::io_direction_and_length::write_idx); out << YAML::Key << "per_tick_grab_threshold" << YAML::Value << fg.per_tick_grab_threshold(); const auto& tb = fg.token_bucket(); diff --git a/include/seastar/core/fair_queue.hh b/include/seastar/core/fair_queue.hh index 546cb00b601..042702341cc 100644 --- a/include/seastar/core/fair_queue.hh +++ b/include/seastar/core/fair_queue.hh @@ -26,7 +26,7 @@ #include #include #include -#include +#include #include #include @@ -114,6 +114,10 @@ private: capacity_t _capacity; bi::slist_member_hook<> _hook; +protected: + ~fair_queue_entry() = default; + void cancel() noexcept { _capacity = 0; } + public: fair_queue_entry(capacity_t c) noexcept : _capacity(c) {} @@ -123,148 +127,8 @@ public: bi::member_hook, &fair_queue_entry::_hook>>; capacity_t capacity() const noexcept { return _capacity; } -}; - -/// \brief Group of queues class -/// -/// This is a fair group. It's attached by one or mode fair queues. On machines having the -/// big* amount of shards, queues use the group to borrow/lend the needed capacity for -/// requests dispatching. -/// -/// * Big means that when all shards sumbit requests alltogether the disk is unable to -/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot -/// cope with the number of arriving requests, or the total size of the data withing -/// the given time frame exceeds the disk throughput. -class fair_group { -public: - using capacity_t = fair_queue_entry::capacity_t; - using clock_type = std::chrono::steady_clock; - - /* - * tldr; The math - * - * Bw, Br -- write/read bandwidth (bytes per second) - * Ow, Or -- write/read iops (ops per second) - * - * xx_max -- their maximum values (configured) - * - * Throttling formula: - * - * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K - * - * where K is the scalar value <= 1.0 (also configured) - * - * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e. - * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into - * - * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K - * - * Fair queue tickets are {w, s} weight-size pairs that are - * - * s = read_base_count * br, for reads - * Br_max/Bw_max * read_base_count * bw, for writes - * - * w = read_base_count, for reads - * Or_max/Ow_max * read_base_count, for writes - * - * Thus the formula turns into - * - * d(sum(w/W + s/S))/dr <= K - * - * where {w, s} is the ticket value if a request and sum summarizes the - * ticket values from all the requests seen so far, {W, S} is the ticket - * value that corresonds to a virtual summary of Or_max requests of - * Br_max size total. - */ - - /* - * The normalization results in a float of the 2^-30 seconds order of - * magnitude. Not to invent float point atomic arithmetics, the result - * is converted to an integer by multiplying by a factor that's large - * enough to turn these values into a non-zero integer. - * - * Also, the rates in bytes/sec when adjusted by io-queue according to - * multipliers become too large to be stored in 32-bit ticket value. - * Thus the rate resolution is applied. The t.bucket is configured with a - * time period for which the speeds from F (in above formula) are taken. - */ - - static constexpr float fixed_point_factor = float(1 << 24); - using rate_resolution = std::milli; - using token_bucket_t = internal::shared_token_bucket; - -private: - - /* - * The dF/dt <= K limitation is managed by the modified token bucket - * algo where tokens are ticket.normalize(cost_capacity), the refill - * rate is K. - * - * The token bucket algo must have the limit on the number of tokens - * accumulated. Here it's configured so that it accumulates for the - * latency_goal duration. - * - * The replenish threshold is the minimal number of tokens to put back. - * It's reserved for future use to reduce the load on the replenish - * timestamp. - * - * The timestamp, in turn, is the time when the bucket was replenished - * last. Every time a shard tries to get tokens from bucket it first - * tries to convert the time that had passed since this timestamp - * into more tokens in the bucket. - */ - - token_bucket_t _token_bucket; - const capacity_t _per_tick_threshold; - -public: - - // Convert internal capacity value back into the real token - static double capacity_tokens(capacity_t cap) noexcept { - return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count(); - } - - // Convert floating-point tokens into the token bucket capacity - static capacity_t tokens_capacity(double tokens) noexcept { - return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor; - } - - auto capacity_duration(capacity_t cap) const noexcept { - return _token_bucket.duration_for(cap); - } - - struct config { - sstring label = ""; - /* - * There are two "min" values that can be configured. The former one - * is the minimal weight:size pair that the upper layer is going to - * submit. However, it can submit _larger_ values, and the fair queue - * must accept those as large as the latter pair (but it can accept - * even larger values, of course) - */ - double min_tokens = 0.0; - double limit_min_tokens = 0.0; - std::chrono::duration rate_limit_duration = std::chrono::milliseconds(1); - }; - - explicit fair_group(config cfg, unsigned nr_queues); - fair_group(fair_group&&) = delete; - - capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); } - capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } - capacity_t grab_capacity(capacity_t cap) noexcept; - clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } - void replenish_capacity(clock_type::time_point now) noexcept; - void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; - - capacity_t capacity_deficiency(capacity_t from) const noexcept; - - std::chrono::duration rate_limit_duration() const noexcept { - std::chrono::duration dur((double)_token_bucket.limit() / _token_bucket.rate()); - return std::chrono::duration_cast>(dur); - } - - const token_bucket_t& token_bucket() const noexcept { return _token_bucket; } + virtual throttle::grab_result can_dispatch() const noexcept = 0; + virtual void dispatch() = 0; }; /// \brief Fair queuing class @@ -299,7 +163,7 @@ public: using class_id = unsigned int; class priority_class_data; - using capacity_t = fair_group::capacity_t; + using capacity_t = fair_queue_entry::capacity_t; using signed_capacity_t = std::make_signed_t; private: @@ -322,8 +186,6 @@ private: }; config _config; - fair_group& _group; - clock_type::time_point _group_replenish; fair_queue_ticket _resources_executing; fair_queue_ticket _resources_queued; priority_queue _handles; @@ -331,40 +193,17 @@ private: size_t _nr_classes = 0; capacity_t _last_accumulated = 0; - /* - * When the shared capacity os over the local queue delays - * further dispatching untill better times - * - * \head -- the value group head rover is expected to cross - * \cap -- the capacity that's accounted on the group - * - * The last field is needed to "rearm" the wait in case - * queue decides that it wants to dispatch another capacity - * in the middle of the waiting - */ - struct pending { - capacity_t head; - capacity_t cap; - - pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {} - }; - - std::optional _pending; - void push_priority_class(priority_class_data& pc) noexcept; void push_priority_class_from_idle(priority_class_data& pc) noexcept; void pop_priority_class(priority_class_data& pc) noexcept; void plug_priority_class(priority_class_data& pc) noexcept; void unplug_priority_class(priority_class_data& pc) noexcept; - enum class grab_result { grabbed, cant_preempt, pending }; - grab_result grab_capacity(const fair_queue_entry& ent) noexcept; - grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; public: /// Constructs a fair queue with configuration parameters \c cfg. /// /// \param cfg an instance of the class \ref config - explicit fair_queue(fair_group& shared, config cfg); + explicit fair_queue(config cfg); fair_queue(fair_queue&&) = delete; ~fair_queue(); @@ -388,14 +227,6 @@ public: /// \return the amount of resources (weight, size) currently executing fair_queue_ticket resources_currently_executing() const; - capacity_t tokens_capacity(double tokens) const noexcept { - return _group.tokens_capacity(tokens); - } - - capacity_t maximum_capacity() const noexcept { - return _group.maximum_capacity(); - } - /// Queue the entry \c ent through this class' \ref fair_queue /// /// The user of this interface is supposed to call \ref notify_requests_finished when the @@ -405,15 +236,8 @@ public: void plug_class(class_id c) noexcept; void unplug_class(class_id c) noexcept; - /// Notifies that ont request finished - /// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished. - void notify_request_finished(fair_queue_entry::capacity_t cap) noexcept; - void notify_request_cancelled(fair_queue_entry& ent) noexcept; - /// Try to execute new requests if there is capacity left in the queue. - void dispatch_requests(std::function cb); - - clock_type::time_point next_pending_aio() const noexcept; + void dispatch_requests(); std::vector metrics(class_id c); }; diff --git a/include/seastar/core/internal/throttle.hh b/include/seastar/core/internal/throttle.hh new file mode 100644 index 00000000000..1c3da3e5ca5 --- /dev/null +++ b/include/seastar/core/internal/throttle.hh @@ -0,0 +1,220 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2016 ScyllaDB + */ +#pragma once + +#include +#include + +namespace seastar { + +/// \brief Group of queues class +/// +/// This is a fair group. It's attached by one or mode fair queues. On machines having the +/// big* amount of shards, queues use the group to borrow/lend the needed capacity for +/// requests dispatching. +/// +/// * Big means that when all shards sumbit requests alltogether the disk is unable to +/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot +/// cope with the number of arriving requests, or the total size of the data withing +/// the given time frame exceeds the disk throughput. +class shared_throttle { +public: + using capacity_t = uint64_t; // XXX -- this could be a template parameter + using clock_type = std::chrono::steady_clock; + + /* + * tldr; The math + * + * Bw, Br -- write/read bandwidth (bytes per second) + * Ow, Or -- write/read iops (ops per second) + * + * xx_max -- their maximum values (configured) + * + * Throttling formula: + * + * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K + * + * where K is the scalar value <= 1.0 (also configured) + * + * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e. + * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into + * + * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K + * + * Fair queue tickets are {w, s} weight-size pairs that are + * + * s = read_base_count * br, for reads + * Br_max/Bw_max * read_base_count * bw, for writes + * + * w = read_base_count, for reads + * Or_max/Ow_max * read_base_count, for writes + * + * Thus the formula turns into + * + * d(sum(w/W + s/S))/dr <= K + * + * where {w, s} is the ticket value if a request and sum summarizes the + * ticket values from all the requests seen so far, {W, S} is the ticket + * value that corresonds to a virtual summary of Or_max requests of + * Br_max size total. + */ + + /* + * The normalization results in a float of the 2^-30 seconds order of + * magnitude. Not to invent float point atomic arithmetics, the result + * is converted to an integer by multiplying by a factor that's large + * enough to turn these values into a non-zero integer. + * + * Also, the rates in bytes/sec when adjusted by io-queue according to + * multipliers become too large to be stored in 32-bit ticket value. + * Thus the rate resolution is applied. The t.bucket is configured with a + * time period for which the speeds from F (in above formula) are taken. + */ + + static constexpr float fixed_point_factor = float(1 << 24); + using rate_resolution = std::milli; + using token_bucket_t = internal::shared_token_bucket; + +private: + + /* + * The dF/dt <= K limitation is managed by the modified token bucket + * algo where tokens are ticket.normalize(cost_capacity), the refill + * rate is K. + * + * The token bucket algo must have the limit on the number of tokens + * accumulated. Here it's configured so that it accumulates for the + * latency_goal duration. + * + * The replenish threshold is the minimal number of tokens to put back. + * It's reserved for future use to reduce the load on the replenish + * timestamp. + * + * The timestamp, in turn, is the time when the bucket was replenished + * last. Every time a shard tries to get tokens from bucket it first + * tries to convert the time that had passed since this timestamp + * into more tokens in the bucket. + */ + + token_bucket_t _token_bucket; + const capacity_t _per_tick_threshold; + +public: + + // Convert internal capacity value back into the real token + static double capacity_tokens(capacity_t cap) noexcept { + return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count(); + } + + // Convert floating-point tokens into the token bucket capacity + static capacity_t tokens_capacity(double tokens) noexcept { + return tokens * token_bucket_t::rate_cast(std::chrono::seconds(1)).count() * fixed_point_factor; + } + + auto capacity_duration(capacity_t cap) const noexcept { + return _token_bucket.duration_for(cap); + } + + struct config { + sstring label = ""; + /* + * There are two "min" values that can be configured. The former one + * is the minimal weight:size pair that the upper layer is going to + * submit. However, it can submit _larger_ values, and the fair queue + * must accept those as large as the latter pair (but it can accept + * even larger values, of course) + */ + double min_tokens = 0.0; + double limit_min_tokens = 0.0; + std::chrono::duration rate_limit_duration = std::chrono::milliseconds(1); + }; + + explicit shared_throttle(config cfg, unsigned nr_queues); + shared_throttle(shared_throttle&&) = delete; + + capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); } + capacity_t per_tick_grab_threshold() const noexcept { return _per_tick_threshold; } + capacity_t grab_capacity(capacity_t cap) noexcept; + clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } + void replenish_capacity(clock_type::time_point now) noexcept; + void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; + + capacity_t capacity_deficiency(capacity_t from) const noexcept; + + std::chrono::duration rate_limit_duration() const noexcept { + std::chrono::duration dur((double)_token_bucket.limit() / _token_bucket.rate()); + return std::chrono::duration_cast>(dur); + } + + const token_bucket_t& token_bucket() const noexcept { return _token_bucket; } +}; + +class throttle { + using clock_type = std::chrono::steady_clock; + + /* + * When the shared capacity os over the local queue delays + * further dispatching untill better times + * + * \head -- the value group head rover is expected to cross + * \cap -- the capacity that's accounted on the group + * + * The last field is needed to "rearm" the wait in case + * queue decides that it wants to dispatch another capacity + * in the middle of the waiting + */ + struct pending { + shared_throttle::capacity_t head; + shared_throttle::capacity_t cap; + + pending(shared_throttle::capacity_t t, shared_throttle::capacity_t c) noexcept : head(t), cap(c) {} + }; + + shared_throttle& _group; + clock_type::time_point _group_replenish; + std::optional _pending; +public: + throttle(shared_throttle& st) noexcept + : _group(st) + , _group_replenish(clock_type::now()) + {} + + enum class grab_result { grabbed, cant_preempt, pending }; + grab_result grab_capacity(shared_throttle::capacity_t) noexcept; + grab_result grab_pending_capacity(shared_throttle::capacity_t) noexcept; + + shared_throttle::capacity_t tokens_capacity(double tokens) const noexcept { + return _group.tokens_capacity(tokens); + } + + shared_throttle::capacity_t maximum_capacity() const noexcept { + return _group.maximum_capacity(); + } + + shared_throttle::capacity_t per_tick_grab_threshold() const noexcept { + return _group.per_tick_grab_threshold(); + } + + clock_type::time_point next_pending() const noexcept; +}; + +} // seastar namespace diff --git a/include/seastar/core/io_queue.hh b/include/seastar/core/io_queue.hh index af6cda1c737..bccfd70c900 100644 --- a/include/seastar/core/io_queue.hh +++ b/include/seastar/core/io_queue.hh @@ -43,7 +43,7 @@ namespace seastar { class io_queue; namespace internal { -const fair_group& get_fair_group(const io_queue& ioq, unsigned stream); +const shared_throttle& io_throttle(const io_queue& ioq, unsigned stream); } #if SEASTAR_API_LEVEL < 7 @@ -93,11 +93,16 @@ public: private: std::vector> _priority_classes; io_group_ptr _group; - boost::container::static_vector _streams; + struct stream { + fair_queue fq; + throttle thr; + stream(shared_throttle&, fair_queue::config); + }; + boost::container::static_vector _streams; internal::io_sink& _sink; friend struct ::io_queue_for_tests; - friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream); + friend const shared_throttle& internal::io_throttle(const io_queue& ioq, unsigned stream); priority_class_data& find_or_create_class(internal::priority_class pc); future queue_request(internal::priority_class pc, internal::io_direction_and_length dnl, internal::io_request req, io_intent* intent, iovec_keeper iovs) noexcept; @@ -163,9 +168,9 @@ public: future submit_io_write(internal::priority_class priority_class, size_t len, internal::io_request req, io_intent* intent, iovec_keeper iovs = {}) noexcept; + throttle::grab_result can_dispatch_request(const queued_io_request& rq) noexcept; void submit_request(io_desc_read_write* desc, internal::io_request req) noexcept; void cancel_request(queued_io_request& req) noexcept; - void complete_cancelled_request(queued_io_request& req) noexcept; void complete_request(io_desc_read_write& desc) noexcept; [[deprecated("I/O queue users should not track individual requests, but resources (weight, size) passing through the queue")]] @@ -218,16 +223,16 @@ public: private: friend class io_queue; friend struct ::io_queue_for_tests; - friend const fair_group& internal::get_fair_group(const io_queue& ioq, unsigned stream); + friend const shared_throttle& internal::io_throttle(const io_queue& ioq, unsigned stream); const io_queue::config _config; size_t _max_request_length[2]; - boost::container::static_vector _fgs; + boost::container::static_vector _throttle; std::vector> _priority_classes; util::spinlock _lock; const shard_id _allocated_on; - static fair_group::config make_fair_group_config(const io_queue::config& qcfg) noexcept; + static shared_throttle::config make_throttle_config(const io_queue::config& qcfg) noexcept; priority_class_data& find_or_create_class(internal::priority_class pc); }; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bc7523f7a43..6e1daf203cb 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,7 @@ target_sources (seastar-module core/exception_hacks.cc core/execution_stage.cc core/fair_queue.cc + core/throttle.cc core/file.cc core/fsnotify.cc core/fstream.cc diff --git a/src/core/fair_queue.cc b/src/core/fair_queue.cc index 1ff282d42b0..26b08d819b1 100644 --- a/src/core/fair_queue.cc +++ b/src/core/fair_queue.cc @@ -102,41 +102,6 @@ fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_que std::max(a._size - b._size, 0)); } -fair_group::fair_group(config cfg, unsigned nr_queues) - : _token_bucket(fixed_point_factor, - std::max(fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), tokens_capacity(cfg.limit_min_tokens)), - tokens_capacity(cfg.min_tokens) - ) - , _per_tick_threshold(_token_bucket.limit() / nr_queues) -{ - if (tokens_capacity(cfg.min_tokens) > _token_bucket.threshold()) { - throw std::runtime_error("Fair-group replenisher limit is lower than threshold"); - } -} - -auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t { - assert(cap <= _token_bucket.limit()); - return _token_bucket.grab(cap); -} - -void fair_group::replenish_capacity(clock_type::time_point now) noexcept { - _token_bucket.replenish(now); -} - -void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { - auto now = clock_type::now(); - auto extra = _token_bucket.accumulated_in(now - local_ts); - - if (extra >= _token_bucket.threshold()) { - local_ts = now; - replenish_capacity(now); - } -} - -auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t { - return _token_bucket.deficiency(from); -} - // Priority class, to be used with a given fair_queue class fair_queue::priority_class_data { friend class fair_queue; @@ -161,10 +126,8 @@ bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const return lhs->_accumulated > rhs->_accumulated; } -fair_queue::fair_queue(fair_group& group, config cfg) +fair_queue::fair_queue(config cfg) : _config(std::move(cfg)) - , _group(group) - , _group_replenish(clock_type::now()) { } @@ -187,7 +150,7 @@ void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept // duration. For this estimate how many capacity units can be // accumulated with the current class shares per rate resulution // and scale it up to tau. - capacity_t max_deviation = fair_group::fixed_point_factor / pc._shares * fair_group::token_bucket_t::rate_cast(_config.tau).count(); + capacity_t max_deviation = shared_throttle::fixed_point_factor / pc._shares * shared_throttle::token_bucket_t::rate_cast(_config.tau).count(); // On start this deviation can go to negative values, so not to // introduce extra if's for that short corner case, use signed // arithmetics and make sure the _accumulated value doesn't grow @@ -227,37 +190,6 @@ void fair_queue::unplug_class(class_id cid) noexcept { unplug_priority_class(*_priority_classes[cid]); } -auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - _group.maybe_replenish_capacity(_group_replenish); - - if (_group.capacity_deficiency(_pending->head)) { - return grab_result::pending; - } - - capacity_t cap = ent._capacity; - if (cap > _pending->cap) { - return grab_result::cant_preempt; - } - - _pending.reset(); - return grab_result::grabbed; -} - -auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result { - if (_pending) { - return grab_pending_capacity(ent); - } - - capacity_t cap = ent._capacity; - capacity_t want_head = _group.grab_capacity(cap); - if (_group.capacity_deficiency(want_head)) { - _pending.emplace(want_head, cap); - return grab_result::pending; - } - - return grab_result::grabbed; -} - void fair_queue::register_priority_class(class_id id, uint32_t shares) { if (id >= _priority_classes.size()) { _priority_classes.resize(id + 1); @@ -303,38 +235,11 @@ void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept { pc._queue.push_back(ent); } -void fair_queue::notify_request_finished(fair_queue_entry::capacity_t cap) noexcept { -} - -void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { - ent._capacity = 0; -} - -fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept { - if (_pending) { - /* - * We expect the disk to release the ticket within some time, - * but it's ... OK if it doesn't -- the pending wait still - * needs the head rover value to be ahead of the needed value. - * - * It may happen that the capacity gets released before we think - * it will, in this case we will wait for the full value again, - * which's sub-optimal. The expectation is that we think disk - * works faster, than it really does. - */ - auto over = _group.capacity_deficiency(_pending->head); - auto ticks = _group.capacity_duration(over); - return std::chrono::steady_clock::now() + std::chrono::duration_cast(ticks); - } - - return std::chrono::steady_clock::time_point::max(); -} - -void fair_queue::dispatch_requests(std::function cb) { +void fair_queue::dispatch_requests() { capacity_t dispatched = 0; boost::container::small_vector preempt; - while (!_handles.empty() && (dispatched < _group.per_tick_grab_threshold())) { + while (!_handles.empty()) { priority_class_data& h = *_handles.top(); if (h._queue.empty() || !h._plugged) { pop_priority_class(h); @@ -342,12 +247,12 @@ void fair_queue::dispatch_requests(std::function cb) { } auto& req = h._queue.front(); - auto gr = grab_capacity(req); - if (gr == grab_result::pending) { + auto gr = req.can_dispatch(); + if (gr == throttle::grab_result::pending) { break; } - if (gr == grab_result::cant_preempt) { + if (gr == throttle::grab_result::cant_preempt) { pop_priority_class(h); preempt.emplace_back(&h); continue; @@ -380,7 +285,7 @@ void fair_queue::dispatch_requests(std::function cb) { h._pure_accumulated += req_cap; dispatched += req_cap; - cb(req); + req.dispatch(); if (h._plugged && !h._queue.empty()) { push_priority_class(h); @@ -397,10 +302,10 @@ std::vector fair_queue::metrics( priority_class_data& pc = *_priority_classes[c]; return std::vector({ sm::make_counter("consumption", - [&pc] { return fair_group::capacity_tokens(pc._pure_accumulated); }, + [&pc] { return shared_throttle::capacity_tokens(pc._pure_accumulated); }, sm::description("Accumulated disk capacity units consumed by this class; an increment per-second rate indicates full utilization")), sm::make_counter("adjusted_consumption", - [&pc] { return fair_group::capacity_tokens(pc._accumulated); }, + [&pc] { return shared_throttle::capacity_tokens(pc._accumulated); }, sm::description("Consumed disk capacity units adjusted for class shares and idling preemption")), }); } diff --git a/src/core/io_queue.cc b/src/core/io_queue.cc index ca4fea2169a..303bbf47f25 100644 --- a/src/core/io_queue.cc +++ b/src/core/io_queue.cc @@ -211,6 +211,11 @@ class io_queue::priority_class_data { metrics::metric_groups metric_groups; }; +io_queue::stream::stream(shared_throttle& st, fair_queue::config cfg) + : fq(std::move(cfg)) + , thr(st) +{ } + class io_desc_read_write final : public io_completion { io_queue& _ioq; io_queue::priority_class_data& _pclass; @@ -272,10 +277,9 @@ class io_desc_read_write final : public io_completion { stream_id stream() const noexcept { return _stream; } }; -class queued_io_request : private internal::io_request { +class queued_io_request final : private internal::io_request, public fair_queue_entry { io_queue& _ioq; const stream_id _stream; - fair_queue_entry _fq_entry; internal::cancellable_queue::link _intent; std::unique_ptr _desc; @@ -284,18 +288,22 @@ class queued_io_request : private internal::io_request { public: queued_io_request(internal::io_request req, io_queue& q, fair_queue_entry::capacity_t cap, io_queue::priority_class_data& pc, io_direction_and_length dnl, iovec_keeper iovs) : io_request(std::move(req)) + , fair_queue_entry(cap) , _ioq(q) , _stream(_ioq.request_stream(dnl)) - , _fq_entry(cap) , _desc(std::make_unique(_ioq, pc, _stream, dnl, cap, std::move(iovs))) { } queued_io_request(queued_io_request&&) = delete; + ~queued_io_request() = default; - void dispatch() noexcept { + virtual throttle::grab_result can_dispatch() const noexcept { + return _ioq.can_dispatch_request(*this); + } + + virtual void dispatch() noexcept override { if (is_cancelled()) { - _ioq.complete_cancelled_request(*this); delete this; return; } @@ -308,6 +316,7 @@ class queued_io_request : private internal::io_request { void cancel() noexcept { _ioq.cancel_request(*this); + fair_queue_entry::cancel(); _desc.release()->cancel(); } @@ -316,13 +325,8 @@ class queued_io_request : private internal::io_request { } future get_future() noexcept { return _desc->get_future(); } - fair_queue_entry& queue_entry() noexcept { return _fq_entry; } stream_id stream() const noexcept { return _stream; } - static queued_io_request& from_fq_entry(fair_queue_entry& ent) noexcept { - return *boost::intrusive::get_parent_from_member(&ent, &queued_io_request::_fq_entry); - } - static queued_io_request& from_cq_link(internal::cancellable_queue::link& link) noexcept { return *boost::intrusive::get_parent_from_member(&link, &queued_io_request::_intent); } @@ -527,8 +531,8 @@ sstring io_request::opname() const { std::abort(); } -const fair_group& get_fair_group(const io_queue& ioq, unsigned stream) { - return ioq._group->_fgs[stream]; +const shared_throttle& io_throttle(const io_queue& ioq, unsigned stream) { + return ioq._group->_throttle[stream]; } } // internal namespace @@ -551,7 +555,6 @@ void io_queue::complete_request(io_desc_read_write& desc) noexcept { _requests_executing--; _requests_completed++; - _streams[desc.stream()].notify_request_finished(desc.capacity()); } fair_queue::config io_queue::make_fair_queue_config(const config& iocfg, sstring label) { @@ -569,11 +572,11 @@ io_queue::io_queue(io_group_ptr group, internal::io_sink& sink) auto& cfg = get_config(); if (cfg.duplex) { static_assert(internal::io_direction_and_length::write_idx == 0); - _streams.emplace_back(_group->_fgs[0], make_fair_queue_config(cfg, "write")); + _streams.emplace_back(_group->_throttle[0], make_fair_queue_config(cfg, "write")); static_assert(internal::io_direction_and_length::read_idx == 1); - _streams.emplace_back(_group->_fgs[1], make_fair_queue_config(cfg, "read")); + _streams.emplace_back(_group->_throttle[1], make_fair_queue_config(cfg, "read")); } else { - _streams.emplace_back(_group->_fgs[0], make_fair_queue_config(cfg, "rw")); + _streams.emplace_back(_group->_throttle[0], make_fair_queue_config(cfg, "rw")); } _flow_ratio_update.arm_periodic(std::chrono::duration_cast(_group->io_latency_goal() * cfg.flow_ratio_ticks)); @@ -588,8 +591,8 @@ io_queue::io_queue(io_group_ptr group, internal::io_sink& sink) }); } -fair_group::config io_group::make_fair_group_config(const io_queue::config& qcfg) noexcept { - fair_group::config cfg; +shared_throttle::config io_group::make_throttle_config(const io_queue::config& qcfg) noexcept { + shared_throttle::config cfg; cfg.label = fmt::format("io-queue-{}", qcfg.devid); double min_weight = std::min(io_queue::read_request_base_count, qcfg.disk_req_write_to_read_multiplier); double min_size = std::min(io_queue::read_request_base_count, qcfg.disk_blocks_write_to_read_multiplier); @@ -602,17 +605,17 @@ fair_group::config io_group::make_fair_group_config(const io_queue::config& qcfg } std::chrono::duration io_group::io_latency_goal() const noexcept { - return _fgs.front().rate_limit_duration(); + return _throttle.front().rate_limit_duration(); } io_group::io_group(io_queue::config io_cfg, unsigned nr_queues) : _config(std::move(io_cfg)) , _allocated_on(this_shard_id()) { - auto fg_cfg = make_fair_group_config(_config); - _fgs.emplace_back(fg_cfg, nr_queues); + auto fg_cfg = make_throttle_config(_config); + _throttle.emplace_back(fg_cfg, nr_queues); if (_config.duplex) { - _fgs.emplace_back(fg_cfg, nr_queues); + _throttle.emplace_back(fg_cfg, nr_queues); } auto goal = io_latency_goal(); @@ -629,10 +632,10 @@ io_group::io_group(io_queue::config io_cfg, unsigned nr_queues) */ auto update_max_size = [this] (unsigned idx) { auto g_idx = _config.duplex ? idx : 0; - auto max_cap = _fgs[g_idx].maximum_capacity(); + auto max_cap = _throttle[g_idx].maximum_capacity(); for (unsigned shift = 0; ; shift++) { auto tokens = internal::request_tokens(io_direction_and_length(idx, 1 << (shift + io_queue::block_size_shift)), _config); - auto cap = _fgs[g_idx].tokens_capacity(tokens); + auto cap = _throttle[g_idx].tokens_capacity(tokens); if (cap > max_cap) { if (shift == 0) { throw std::runtime_error("IO-group limits are too low"); @@ -661,7 +664,7 @@ io_queue::~io_queue() { for (auto&& pc_data : _priority_classes) { if (pc_data) { for (auto&& s : _streams) { - s.unregister_priority_class(pc_data->fq_class()); + s.fq.unregister_priority_class(pc_data->fq_class()); } } } @@ -833,8 +836,8 @@ void io_queue::register_stats(sstring name, priority_class_data& pc) { } for (auto&& s : _streams) { - for (auto&& m : s.metrics(pc.fq_class())) { - m(owner_l)(mnt_l)(class_l)(group_l)(sm::label("stream")(s.label())); + for (auto&& m : s.fq.metrics(pc.fq_class())) { + m(owner_l)(mnt_l)(class_l)(group_l)(sm::label("stream")(s.fq.label())); metrics.emplace_back(std::move(m)); } } @@ -866,7 +869,7 @@ io_queue::priority_class_data& io_queue::find_or_create_class(internal::priority // This conveys all the information we need and allows one to easily group all classes from // the same I/O queue (by filtering by shard) for (auto&& s : _streams) { - s.register_priority_class(id, shares); + s.fq.register_priority_class(id, shares); } auto& pg = _group->find_or_create_class(pc); auto pc_data = std::make_unique(pc, shares, *this, pg); @@ -920,12 +923,12 @@ fair_queue_entry::capacity_t io_queue::request_capacity(io_direction_and_length const auto& cfg = get_config(); auto tokens = internal::request_tokens(dnl, cfg); if (_flow_ratio <= cfg.flow_ratio_backpressure_threshold) { - return _streams[request_stream(dnl)].tokens_capacity(tokens); + return _streams[request_stream(dnl)].thr.tokens_capacity(tokens); } auto stream = request_stream(dnl); - auto cap = _streams[stream].tokens_capacity(tokens * _flow_ratio); - auto max_cap = _streams[stream].maximum_capacity(); + auto cap = _streams[stream].thr.tokens_capacity(tokens * _flow_ratio); + auto max_cap = _streams[stream].thr.maximum_capacity(); return std::min(cap, max_cap); } @@ -949,7 +952,7 @@ future io_queue::queue_one_request(internal::priority_class pc, io_direc queued_req->set_intent(cq); } - _streams[queued_req->stream()].queue(pclass.fq_class(), queued_req->queue_entry()); + _streams[queued_req->stream()].fq.queue(pclass.fq_class(), *queued_req); queued_req.release(); pclass.on_queue(); _queued_requests++; @@ -1032,9 +1035,7 @@ future io_queue::submit_io_write(internal::priority_class pc, size_t len void io_queue::poll_io_queue() { for (auto&& st : _streams) { - st.dispatch_requests([] (fair_queue_entry& fqe) { - queued_io_request::from_fq_entry(fqe).dispatch(); - }); + st.fq.dispatch_requests(); } } @@ -1045,20 +1046,19 @@ void io_queue::submit_request(io_desc_read_write* desc, internal::io_request req _sink.submit(desc, std::move(req)); } -void io_queue::cancel_request(queued_io_request& req) noexcept { - _queued_requests--; - _streams[req.stream()].notify_request_cancelled(req.queue_entry()); +throttle::grab_result io_queue::can_dispatch_request(const queued_io_request& rq) noexcept { + return _streams[rq.stream()].thr.grab_capacity(rq.capacity()); } -void io_queue::complete_cancelled_request(queued_io_request& req) noexcept { - _streams[req.stream()].notify_request_finished(req.queue_entry().capacity()); +void io_queue::cancel_request(queued_io_request& req) noexcept { + _queued_requests--; } io_queue::clock_type::time_point io_queue::next_pending_aio() const noexcept { clock_type::time_point next = clock_type::time_point::max(); for (const auto& s : _streams) { - clock_type::time_point n = s.next_pending_aio(); + clock_type::time_point n = s.thr.next_pending(); if (n < next) { next = std::move(n); } @@ -1072,7 +1072,7 @@ io_queue::update_shares_for_class(internal::priority_class pc, size_t new_shares auto& pclass = find_or_create_class(pc); pclass.update_shares(new_shares); for (auto&& s : _streams) { - s.update_shares_for_class(pclass.fq_class(), new_shares); + s.fq.update_shares_for_class(pclass.fq_class(), new_shares); } } @@ -1102,13 +1102,13 @@ io_queue::rename_priority_class(internal::priority_class pc, sstring new_name) { void io_queue::throttle_priority_class(const priority_class_data& pc) noexcept { for (auto&& s : _streams) { - s.unplug_class(pc.fq_class()); + s.fq.unplug_class(pc.fq_class()); } } void io_queue::unthrottle_priority_class(const priority_class_data& pc) noexcept { for (auto&& s : _streams) { - s.plug_class(pc.fq_class()); + s.fq.plug_class(pc.fq_class()); } } diff --git a/src/core/throttle.cc b/src/core/throttle.cc new file mode 100644 index 00000000000..1fbc746b640 --- /dev/null +++ b/src/core/throttle.cc @@ -0,0 +1,114 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2024 ScyllaDB + */ + +#ifdef SEASTAR_MODULE +module; +#endif + +#include + +namespace seastar { + +shared_throttle::shared_throttle(config cfg, unsigned nr_queues) + : _token_bucket(fixed_point_factor, + std::max(fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), tokens_capacity(cfg.limit_min_tokens)), + tokens_capacity(cfg.min_tokens) + ) + , _per_tick_threshold(_token_bucket.limit() / nr_queues) +{ + if (tokens_capacity(cfg.min_tokens) > _token_bucket.threshold()) { + throw std::runtime_error("Fair-group replenisher limit is lower than threshold"); + } +} + +auto shared_throttle::grab_capacity(capacity_t cap) noexcept -> capacity_t { + assert(cap <= _token_bucket.limit()); + return _token_bucket.grab(cap); +} + +void shared_throttle::replenish_capacity(clock_type::time_point now) noexcept { + _token_bucket.replenish(now); +} + +void shared_throttle::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { + auto now = clock_type::now(); + auto extra = _token_bucket.accumulated_in(now - local_ts); + + if (extra >= _token_bucket.threshold()) { + local_ts = now; + replenish_capacity(now); + } +} + +auto shared_throttle::capacity_deficiency(capacity_t from) const noexcept -> capacity_t { + return _token_bucket.deficiency(from); +} + +auto throttle::grab_pending_capacity(shared_throttle::capacity_t cap) noexcept -> grab_result { + _group.maybe_replenish_capacity(_group_replenish); + + if (_group.capacity_deficiency(_pending->head)) { + return grab_result::pending; + } + + if (cap > _pending->cap) { + return grab_result::cant_preempt; + } + + _pending.reset(); + return grab_result::grabbed; +} + +auto throttle::grab_capacity(shared_throttle::capacity_t cap) noexcept -> grab_result { + if (_pending) { + return grab_pending_capacity(cap); + } + + auto want_head = _group.grab_capacity(cap); + if (_group.capacity_deficiency(want_head)) { + _pending.emplace(want_head, cap); + return grab_result::pending; + } + + return grab_result::grabbed; +} + +throttle::clock_type::time_point throttle::next_pending() const noexcept { + if (_pending) { + /* + * We expect the disk to release the ticket within some time, + * but it's ... OK if it doesn't -- the pending wait still + * needs the head rover value to be ahead of the needed value. + * + * It may happen that the capacity gets released before we think + * it will, in this case we will wait for the full value again, + * which's sub-optimal. The expectation is that we think disk + * works faster, than it really does. + */ + auto over = _group.capacity_deficiency(_pending->head); + auto ticks = _group.capacity_duration(over); + return std::chrono::steady_clock::now() + std::chrono::duration_cast(ticks); + } + + return std::chrono::steady_clock::time_point::max(); +} + +} // seastar namespace diff --git a/tests/perf/fair_queue_perf.cc b/tests/perf/fair_queue_perf.cc index 515f015f6ff..decc03d7d45 100644 --- a/tests/perf/fair_queue_perf.cc +++ b/tests/perf/fair_queue_perf.cc @@ -32,22 +32,15 @@ static constexpr fair_queue::class_id cid = 0; struct local_fq_and_class { - seastar::fair_group fg; seastar::fair_queue fq; seastar::fair_queue sfq; unsigned executed = 0; - static fair_group::config fg_config() { - fair_group::config cfg; - return cfg; - } - seastar::fair_queue& queue(bool local) noexcept { return local ? fq : sfq; } - local_fq_and_class(seastar::fair_group& sfg) - : fg(fg_config(), 1) - , fq(fg, seastar::fair_queue::config()) - , sfq(sfg, seastar::fair_queue::config()) + local_fq_and_class() + : fq(seastar::fair_queue::config()) + , sfq(seastar::fair_queue::config()) { fq.register_priority_class(cid, 1); sfq.register_priority_class(cid, 1); @@ -59,14 +52,22 @@ struct local_fq_and_class { } }; -struct local_fq_entry { - seastar::fair_queue_entry ent; +struct local_fq_entry final : public seastar::fair_queue_entry { std::function submit; template local_fq_entry(fair_queue_entry::capacity_t cap, Func&& f) - : ent(cap) + : fair_queue_entry(cap) , submit(std::move(f)) {} + + virtual throttle::grab_result can_dispatch() const noexcept override { + return throttle::grab_result::grabbed; + } + + virtual void dispatch() noexcept override { + submit(); + delete this; + } }; struct perf_fair_queue { @@ -75,17 +76,9 @@ struct perf_fair_queue { seastar::sharded local_fq; - seastar::fair_group shared_fg; - - static fair_group::config fg_config() { - fair_group::config cfg; - return cfg; - } - perf_fair_queue() - : shared_fg(fg_config(), smp::count) { - local_fq.start(std::ref(shared_fg)).get(); + local_fq.start().get(); } ~perf_fair_queue() { @@ -99,12 +92,10 @@ future<> perf_fair_queue::test(bool loc) { auto invokers = local_fq.invoke_on_all([loc] (local_fq_and_class& local) { return parallel_for_each(boost::irange(0u, requests_to_dispatch), [&local, loc] (unsigned dummy) { - auto cap = local.queue(loc).tokens_capacity(double(1) / std::numeric_limits::max() + double(1) / std::numeric_limits::max()); - auto req = std::make_unique(cap, [&local, loc, cap] { + auto req = std::make_unique(100, [&local, loc] { local.executed++; - local.queue(loc).notify_request_finished(cap); }); - local.queue(loc).queue(cid, req->ent); + local.queue(loc).queue(cid, *req); req.release(); return make_ready_future<>(); }); @@ -121,11 +112,7 @@ future<> perf_fair_queue::test(bool loc) { local.executed = 0; return do_until([&local] { return local.executed == requests_to_dispatch; }, [&local, loc] { - local.queue(loc).dispatch_requests([] (fair_queue_entry& ent) { - local_fq_entry* le = boost::intrusive::get_parent_from_member(&ent, &local_fq_entry::ent); - le->submit(); - delete le; - }); + local.queue(loc).dispatch_requests(); return make_ready_future<>(); }); }); diff --git a/tests/unit/fair_queue_test.cc b/tests/unit/fair_queue_test.cc index bbcf5f7942c..aeb142e285d 100644 --- a/tests/unit/fair_queue_test.cc +++ b/tests/unit/fair_queue_test.cc @@ -36,38 +36,34 @@ using namespace seastar; using namespace std::chrono_literals; -struct request { - fair_queue_entry fqent; +struct request final : public fair_queue_entry { std::function handle; unsigned index; template request(fair_queue_entry::capacity_t cap, unsigned index, Func&& h) - : fqent(cap) + : fair_queue_entry(cap) , handle(std::move(h)) , index(index) {} - void submit() { + virtual throttle::grab_result can_dispatch() const noexcept override { + return throttle::grab_result::grabbed; + } + + virtual void dispatch() noexcept override { handle(*this); delete this; } }; class test_env { - fair_group _fg; fair_queue _fq; std::vector _results; std::vector> _exceptions; fair_queue::class_id _nr_classes = 0; std::vector _inflight; - static fair_group::config fg_config(unsigned cap) { - fair_group::config cfg; - cfg.rate_limit_duration = std::chrono::microseconds(cap); - return cfg; - } - static fair_queue::config fq_config() { fair_queue::config cfg; cfg.tau = std::chrono::microseconds(50); @@ -75,12 +71,13 @@ class test_env { } void drain() { - do {} while (tick() != 0); + while (tick()) { + continue; + } } public: test_env(unsigned capacity) - : _fg(fg_config(capacity), 1) - , _fq(_fg, fq_config()) + : _fq(fq_config()) {} // As long as there is a request sitting in the queue, tick() will process @@ -90,27 +87,23 @@ class test_env { // Because of this property, one useful use of tick() is to implement a drain() // method (see above) in which all requests currently sent to the queue are drained // before the queue is destroyed. - unsigned tick(unsigned n = 1) { + unsigned tick(unsigned n = 0) { unsigned processed = 0; - _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); - _fq.dispatch_requests([] (fair_queue_entry& ent) { - boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); - }); + while (true) { + _fq.dispatch_requests(); - for (unsigned i = 0; i < n; ++i) { std::vector curr; curr.swap(_inflight); for (auto& req : curr) { + if (processed < n) { + _results[req.index]++; + } processed++; - _results[req.index]++; - _fq.notify_request_finished(req.fqent.capacity()); } - - _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1)); - _fq.dispatch_requests([] (fair_queue_entry& ent) { - boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit(); - }); + if (processed >= n) { + break; + } } return processed; } @@ -131,18 +124,16 @@ class test_env { void do_op(fair_queue::class_id id, unsigned weight) { unsigned index = id; - auto cap = _fq.tokens_capacity(double(weight) / 1'000'000); - auto req = std::make_unique(cap, index, [this, index] (request& req) mutable noexcept { + auto req = std::make_unique(weight * 1000, index, [this, index] (request& req) mutable noexcept { try { _inflight.push_back(std::move(req)); } catch (...) { auto eptr = std::current_exception(); _exceptions[index].push_back(eptr); - _fq.notify_request_finished(req.fqent.capacity()); } }); - _fq.queue(id, req->fqent); + _fq.queue(id, *req); req.release(); } diff --git a/tests/unit/io_queue_test.cc b/tests/unit/io_queue_test.cc index 9c5cd4f51f8..f30289beda2 100644 --- a/tests/unit/io_queue_test.cc +++ b/tests/unit/io_queue_test.cc @@ -85,7 +85,7 @@ struct io_queue_for_tests { } void kick() { - for (auto&& fg : group->_fgs) { + for (auto&& fg : group->_throttle) { fg.replenish_capacity(std::chrono::steady_clock::now()); } }