Skip to content

Commit

Permalink
Merge 'Unfriend reactor from timer' from Pavel Emelyanov
Browse files Browse the repository at this point in the history
Reactor messes with private timer fields, but it doesn't really need to. All the code that does so naturally belongs to timer_set class, not reactor itself.

Closes scylladb#2269

* github.com:scylladb/seastar:
  timer: Unfriend reactor
  reactor: Generalize timer removal
  timer: Add type alias for timer_set
  reactor: Move reactor::complete_timers() to timer_set
  • Loading branch information
avikivity committed Jun 17, 2024
2 parents 2b7bef1 + 249eae5 commit 903e413
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 61 deletions.
14 changes: 6 additions & 8 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,12 @@ private:
std::unique_ptr<internal::cpu_stall_detector> _cpu_stall_detector;

unsigned _max_task_backlog = 1000;
timer_set<timer<>, &timer<>::_link> _timers;
timer_set<timer<>, &timer<>::_link>::timer_list_t _expired_timers;
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link> _lowres_timers;
timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link> _manual_timers;
timer_set<timer<manual_clock>, &timer<manual_clock>::_link>::timer_list_t _expired_manual_timers;
timer<>::set_t _timers;
timer<>::set_t::timer_list_t _expired_timers;
timer<lowres_clock>::set_t _lowres_timers;
timer<lowres_clock>::set_t::timer_list_t _expired_lowres_timers;
timer<manual_clock>::set_t _manual_timers;
timer<manual_clock>::set_t::timer_list_t _expired_manual_timers;
io_stats _io_stats;
uint64_t _fsyncs = 0;
uint64_t _cxx_exceptions = 0;
Expand Down Expand Up @@ -355,8 +355,6 @@ private:
void expire_manual_timers() noexcept;
void start_aio_eventfd_loop();
void stop_aio_eventfd_loop();
template <typename T, typename E, typename EnableFunc>
void complete_timers(T&, E&, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn()));

/**
* Returns TRUE if all pollers allow blocking.
Expand Down
48 changes: 48 additions & 0 deletions include/seastar/core/timer-set.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

namespace seastar {

namespace internal {
void log_timer_callback_exception(std::exception_ptr) noexcept;
}

/**
* A data structure designed for holding and expiring timers. It's
* optimized for timer non-delivery by deferring sorting cost until
Expand Down Expand Up @@ -85,6 +89,7 @@ private:
{
return bitsets::get_last_set(_non_empty_buckets);
}

public:
timer_set() noexcept
: _last(0)
Expand Down Expand Up @@ -155,6 +160,19 @@ public:
}
}

/**
* Removes timer from the active set or the expired list, if the timer is expired
*/
void remove(Timer& timer, timer_list_t& expired) noexcept
{
if (timer._expired) {
expired.erase(expired.iterator_to(timer));
timer._expired = false;
} else {
remove(timer);
}
}

/**
* Expires active timers.
*
Expand Down Expand Up @@ -209,6 +227,36 @@ public:
return exp;
}

template <typename EnableFunc>
void complete(timer_list_t& expired_timers, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn())) {
expired_timers = expire(this->now());
for (auto& t : expired_timers) {
t._expired = true;
}
const auto prev_sg = current_scheduling_group();
while (!expired_timers.empty()) {
auto t = &*expired_timers.begin();
expired_timers.pop_front();
t->_queued = false;
if (t->_armed) {
t->_armed = false;
if (t->_period) {
t->readd_periodic();
}
try {
*internal::current_scheduling_group_ptr() = t->_sg;
t->_callback();
} catch (...) {
internal::log_timer_callback_exception(std::current_exception());
}
}
}
// complete_timers() can be called from the context of run_tasks()
// as well so we need to restore the previous scheduling group (set by run_tasks()).
*internal::current_scheduling_group_ptr() = prev_sg;
enable_fn();
}

/**
* Returns a time point at which expire() should be called
* in order to ensure timers are expired in a timely manner.
Expand Down
3 changes: 2 additions & 1 deletion include/seastar/core/timer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,9 @@ public:
time_point get_timeout() const noexcept {
return _expiry;
}
friend class reactor;

friend class timer_set<timer, &timer::_link>;
using set_t = timer_set<timer, &timer::_link>;
};

extern template class timer<steady_clock_type>;
Expand Down
63 changes: 11 additions & 52 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1524,36 +1524,6 @@ reactor::block_notifier(int) {
engine()._cpu_stall_detector->on_signal();
}

template <typename T, typename E, typename EnableFunc>
void reactor::complete_timers(T& timers, E& expired_timers, EnableFunc&& enable_fn) noexcept(noexcept(enable_fn())) {
expired_timers = timers.expire(timers.now());
for (auto& t : expired_timers) {
t._expired = true;
}
const auto prev_sg = current_scheduling_group();
while (!expired_timers.empty()) {
auto t = &*expired_timers.begin();
expired_timers.pop_front();
t->_queued = false;
if (t->_armed) {
t->_armed = false;
if (t->_period) {
t->readd_periodic();
}
try {
*internal::current_scheduling_group_ptr() = t->_sg;
t->_callback();
} catch (...) {
seastar_logger.error("Timer callback failed: {}", std::current_exception());
}
}
}
// complete_timers() can be called from the context of run_tasks()
// as well so we need to restore the previous scheduling group (set by run_tasks()).
*internal::current_scheduling_group_ptr() = prev_sg;
enable_fn();
}

#ifdef HAVE_OSV
void reactor::timer_thread_func() {
sched::timer tmr(*sched::thread::current());
Expand All @@ -1566,7 +1536,7 @@ void reactor::timer_thread_func() {
_timer_due = 0;
_engine_thread->unsafe_stop();
_pending_tasks.push_front(make_task(default_scheduling_group(), [this] {
complete_timers(_timers, _expired_timers, [this] {
_timers.complete(_expired_timers, [this] {
if (!_timers.empty()) {
enable_timer(_timers.get_next_timeout());
}
Expand Down Expand Up @@ -2495,12 +2465,7 @@ bool reactor::queue_timer(timer<steady_clock_type>* tmr) noexcept {
}

void reactor::del_timer(timer<steady_clock_type>* tmr) noexcept {
if (tmr->_expired) {
_expired_timers.erase(_expired_timers.iterator_to(*tmr));
tmr->_expired = false;
} else {
_timers.remove(*tmr);
}
_timers.remove(*tmr, _expired_timers);
}

void reactor::add_timer(timer<lowres_clock>* tmr) noexcept {
Expand All @@ -2514,12 +2479,7 @@ bool reactor::queue_timer(timer<lowres_clock>* tmr) noexcept {
}

void reactor::del_timer(timer<lowres_clock>* tmr) noexcept {
if (tmr->_expired) {
_expired_lowres_timers.erase(_expired_lowres_timers.iterator_to(*tmr));
tmr->_expired = false;
} else {
_lowres_timers.remove(*tmr);
}
_lowres_timers.remove(*tmr, _expired_lowres_timers);
}

void reactor::add_timer(timer<manual_clock>* tmr) noexcept {
Expand All @@ -2531,12 +2491,7 @@ bool reactor::queue_timer(timer<manual_clock>* tmr) noexcept {
}

void reactor::del_timer(timer<manual_clock>* tmr) noexcept {
if (tmr->_expired) {
_expired_manual_timers.erase(_expired_manual_timers.iterator_to(*tmr));
tmr->_expired = false;
} else {
_manual_timers.remove(*tmr);
}
_manual_timers.remove(*tmr, _expired_manual_timers);
}

void reactor::at_exit(noncopyable_function<future<> ()> func) {
Expand Down Expand Up @@ -2748,7 +2703,7 @@ bool
reactor::do_expire_lowres_timers() noexcept {
auto now = lowres_clock::now();
if (now >= _lowres_next_timeout) {
complete_timers(_lowres_timers, _expired_lowres_timers, [this] () noexcept {
_lowres_timers.complete(_expired_lowres_timers, [this] () noexcept {
if (!_lowres_timers.empty()) {
_lowres_next_timeout = _lowres_timers.get_next_timeout();
} else {
Expand All @@ -2762,7 +2717,7 @@ reactor::do_expire_lowres_timers() noexcept {

void
reactor::expire_manual_timers() noexcept {
complete_timers(_manual_timers, _expired_manual_timers, [] () noexcept {});
_manual_timers.complete(_expired_manual_timers, [] () noexcept {});
}

void
Expand Down Expand Up @@ -3205,7 +3160,7 @@ reactor::activate(task_queue& tq) {
}

void reactor::service_highres_timer() noexcept {
complete_timers(_timers, _expired_timers, [this] () noexcept {
_timers.complete(_expired_timers, [this] () noexcept {
if (!_timers.empty()) {
enable_timer(_timers.get_next_timeout());
}
Expand Down Expand Up @@ -5187,6 +5142,10 @@ run_in_background(future<> f) {
engine().run_in_background(std::move(f));
}

void log_timer_callback_exception(std::exception_ptr ex) noexcept {
seastar_logger.error("Timer callback failed: {}", std::current_exception());
}

}

#ifdef SEASTAR_TASK_BACKTRACE
Expand Down

0 comments on commit 903e413

Please sign in to comment.