Skip to content

Commit

Permalink
abort_source: subscription: keep callback function alive after abort
Browse files Browse the repository at this point in the history
Change 470b539 causes
a regression in scylla when the subscription callback
function is destroyed after being called when abort
is requested.

The app expects this function to remain alive throughout
the life time of the subscription object holding it,
so seastar should not breask this assumption.

This commit keeps it around by adding a boolean
`_aborted` member to subscription used to make
sure the callback is called at most once,
but otherwise, the function is not destroyed
until the whole sunscription is destroyed.

Added unit test to check that.

Signed-off-by: Benny Halevy <[email protected]>

Closes #2360
  • Loading branch information
bhalevy authored and xemul committed Jul 24, 2024
1 parent 4ba9b9c commit 8ecce18
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
15 changes: 9 additions & 6 deletions include/seastar/core/abort_source.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public:
class subscription : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
friend class abort_source;

subscription_callback_type _target = noop_handler;
subscription_callback_type _target;
bool _aborted = false;

explicit subscription(abort_source& as, subscription_callback_type target)
: _target(std::move(target)) {
Expand All @@ -84,30 +85,32 @@ public:
}

public:
static void noop_handler(const std::optional<std::exception_ptr>&) noexcept {}

/// Call the subscribed callback (at most once).
/// This method is called by the \ref abort_source on all listed \ref subscription objects
/// when \ref request_abort() is called.
/// It may be called indepdently by the user at any time, causing the \ref subscription
/// to be unlinked from the \ref abort_source subscriptions list.
void on_abort(const std::optional<std::exception_ptr>& ex) noexcept {
unlink();
auto target = std::exchange(_target, noop_handler);
target(ex);
if (!std::exchange(_aborted, true)) {
_target(ex);
}
}

public:
subscription() = default;

subscription(subscription&& other) noexcept(std::is_nothrow_move_constructible_v<subscription_callback_type>)
: _target(std::move(other._target)) {
: _target(std::move(other._target))
, _aborted(std::exchange(other._aborted, true))
{
subscription_list_type::node_algorithms::swap_nodes(other.this_ptr(), this_ptr());
}

subscription& operator=(subscription&& other) noexcept(std::is_nothrow_move_assignable_v<subscription_callback_type>) {
if (this != &other) {
_target = std::move(other._target);
_aborted = std::exchange(other._aborted, true);
unlink();
subscription_list_type::node_algorithms::swap_nodes(other.this_ptr(), this_ptr());
}
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/abort_source_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <seastar/core/gate.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/do_with.hh>
#include <seastar/util/defer.hh>

using namespace seastar;
using namespace std::chrono_literals;
Expand Down Expand Up @@ -265,3 +266,34 @@ SEASTAR_THREAD_TEST_CASE(test_subscribe_aborted_source) {
sub->on_abort(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE(signalled_ex == nullptr);
}

SEASTAR_THREAD_TEST_CASE(test_subscription_callback_lifetime) {
// The subscription callback function needs to be destroyed
// only when the subscription is destroyed.
bool callback_destroyed = false;
int callback_called = 0;
auto when_destroyed = deferred_action([&callback_destroyed] () noexcept { callback_destroyed = true; });
auto as = abort_source();
auto sub = std::make_unique<optimized_optional<abort_source::subscription>>(as.subscribe([&, when_destroyed = std::move(when_destroyed)] (const std::optional<std::exception_ptr>& ex) noexcept {
callback_called++;
}));
BOOST_REQUIRE_EQUAL(bool(sub), true);
BOOST_REQUIRE_EQUAL(bool(*sub), true);
BOOST_REQUIRE_EQUAL(callback_destroyed, false);
BOOST_REQUIRE_EQUAL(callback_called, 0);

// on_abort should trigger the subscribed callback
as.request_abort_ex(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE_EQUAL(bool(*sub), false);
BOOST_REQUIRE_EQUAL(callback_destroyed, false);
BOOST_REQUIRE_EQUAL(callback_called, 1);

// on_abort is single-shot
(*sub)->on_abort(std::make_exception_ptr(std::runtime_error("signaled")));
BOOST_REQUIRE_EQUAL(callback_destroyed, false);
BOOST_REQUIRE_EQUAL(callback_called, 1);

sub.reset();
BOOST_REQUIRE_EQUAL(callback_destroyed, true);
BOOST_REQUIRE_EQUAL(callback_called, 1);
}

0 comments on commit 8ecce18

Please sign in to comment.