Skip to content

Commit

Permalink
Merge pull request #18305 from ztlpn/fix-balancer-memory-corruption
Browse files Browse the repository at this point in the history
Fix some concurrent memory access problems in partition balancer
  • Loading branch information
piyushredpanda committed May 10, 2024
2 parents 0cc4a56 + 1c63990 commit 32ad2d8
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 27 deletions.
16 changes: 15 additions & 1 deletion src/v/cluster/scheduling/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cluster/logger.h"
#include "cluster/scheduling/allocation_state.h"
#include "utils/exceptions.h"
#include "utils/to_string.h"

#include <fmt/ostream.h>
Expand Down Expand Up @@ -47,6 +48,9 @@ allocation_units::allocation_units(

allocation_units::~allocation_units() {
oncore_debug_verify(_oncore);
if (unlikely(!_state)) {
return;
}
for (const auto& replica : _added_replicas) {
_state->remove_allocation(replica, _domain);
_state->remove_final_count(replica, _domain);
Expand Down Expand Up @@ -80,6 +84,11 @@ allocated_partition::prepare_move(model::node_id prev_node) const {

model::broker_shard allocated_partition::add_replica(
model::node_id node, const std::optional<previous_replica>& prev) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
_original_node2shard.emplace();
for (const auto& bs : _replicas) {
Expand Down Expand Up @@ -155,7 +164,12 @@ bool allocated_partition::is_original(model::node_id node) const {
}

errc allocated_partition::try_revert(const reallocation_step& step) {
if (!_original_node2shard || !_state) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
return errc::no_update_in_progress;
}

Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/tests/partition_balancer_planner_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ struct partition_balancer_planner_fixture {
cluster::partition_balancer_planner make_planner(
model::partition_autobalancing_mode mode
= model::partition_autobalancing_mode::continuous,
size_t max_concurrent_actions = 2) {
size_t max_concurrent_actions = 2,
bool request_ondemand_rebalance = false) {
return cluster::partition_balancer_planner(
cluster::planner_config{
.mode = mode,
.soft_max_disk_usage_ratio = 0.8,
.hard_max_disk_usage_ratio = 0.95,
.max_concurrent_actions = max_concurrent_actions,
.node_availability_timeout_sec = std::chrono::minutes(1),
.ondemand_rebalance_requested = request_ondemand_rebalance,
.segment_fallocation_step = 16,
.node_responsiveness_timeout = std::chrono::seconds(10),
.topic_aware = true,
Expand Down
142 changes: 141 additions & 1 deletion src/v/cluster/tests/partition_balancer_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
// by the Apache License, Version 2.0

#include "base/vlog.h"
#include "cluster/controller_snapshot.h"
#include "cluster/health_monitor_types.h"
#include "cluster/tests/partition_balancer_planner_fixture.h"
#include "utils/stable_iterator_adaptor.h"

#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/defer.hh>

static ss::logger logger("partition_balancer_planner");
static ss::logger logger("pb_planner_test");

// a shorthand to avoid spelling out model::node_id
static model::node_id n(model::node_id::type id) { return model::node_id{id}; };
Expand Down Expand Up @@ -924,3 +927,140 @@ FIXTURE_TEST(balancing_modes, partition_balancer_planner_fixture) {
BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0);
BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 0);
}

FIXTURE_TEST(
concurrent_topic_table_updates, partition_balancer_planner_fixture) {
// Apply lots of topic_table update commands, while concurrently invoking
// the planner. The main goal of this test is to pass ASan checks.

allocator_register_nodes(5);
config::shard_local_cfg().disable_metrics.set_value(true);
config::shard_local_cfg().disable_public_metrics.set_value(true);

auto make_create_tp_cmd = [this](ss::sstring name, int partitions) {
int16_t replication_factor = 3;
cluster::topic_configuration cfg(
test_ns, model::topic{name}, partitions, replication_factor);

ss::chunked_fifo<cluster::partition_assignment> assignments;
for (model::partition_id::type i = 0; i < partitions; ++i) {
std::vector<model::broker_shard> replicas;
for (int r = 0; r < replication_factor; ++r) {
replicas.push_back(model::broker_shard{
model::node_id{r},
random_generators::get_int<uint32_t>(0, 3)});
}
std::shuffle(
replicas.begin(),
replicas.end(),
random_generators::internal::gen);

assignments.push_back(cluster::partition_assignment{
raft::group_id{1}, model::partition_id{i}, replicas});
}
return cluster::create_topic_cmd{
make_tp_ns(name),
cluster::topic_configuration_assignment(cfg, std::move(assignments))};
};

size_t successes = 0;
size_t failures = 0;
size_t reassignments = 0;
bool should_stop = false;
ss::future<> planning_fiber = ss::async([&] {
while (!should_stop) {
vlog(logger.trace, "planning fiber: invoking...");
auto hr = create_health_report();
auto planner = make_planner(
model::partition_autobalancing_mode::node_add, 50, true);

try {
auto plan_data = planner.plan_actions(hr, as).get();
successes += 1;
reassignments += plan_data.reassignments.size();
} catch (concurrent_modification_error&) {
failures += 1;
}
vlog(logger.trace, "planning fiber: iteration done");
}
});
auto deferred = ss::defer([&] {
if (!should_stop) {
should_stop = true;
planning_fiber.get();
}
});

cluster::topic_table other_tt;
model::offset controller_offset{0};
std::set<ss::sstring> cur_topics;
bool node_isolated = false;

for (size_t iter = 0; iter < 1'000; ++iter) {
int random_val = random_generators::get_int(0, 10);
if (random_val == 10) {
// allow the planner to make some progress
ss::sleep(50ms).get();
continue;
}

// randomly create and delete topics
auto topic = ssx::sformat("topic_{}", random_val);
if (!cur_topics.contains(topic)) {
vlog(
logger.trace,
"modifying fiber: creating topic {} (isolated: {})",
topic,
node_isolated);
auto cmd = make_create_tp_cmd(
topic, random_generators::get_int(1, 20));
other_tt.apply(cmd, controller_offset).get();
if (!node_isolated) {
workers.dispatch_topic_command(cmd);
}
cur_topics.insert(topic);
} else {
vlog(
logger.trace,
"modifying fiber: deleting topic {} (isolated: {})",
topic,
node_isolated);
cluster::delete_topic_cmd cmd{make_tp_ns(topic), make_tp_ns(topic)};
other_tt.apply(cmd, controller_offset).get();
if (!node_isolated) {
workers.dispatch_topic_command(cmd);
}
cur_topics.erase(topic);
}

if (random_generators::get_int(5) == 0) {
// flip node_isolated flag

if (node_isolated) {
// simulate node coming back from isolation and recovering
// current controller state from a snapshot.
vlog(logger.trace, "modifying fiber: applying snapshot");
node_isolated = false;
cluster::controller_snapshot snap;
other_tt.fill_snapshot(snap).get();
workers.members.local().fill_snapshot(snap);
workers.dispatcher.apply_snapshot(controller_offset, snap)
.get();
} else {
node_isolated = true;
}
}

controller_offset += 1;

vlog(logger.trace, "modifying fiber: iteration done");
}

should_stop = true;
planning_fiber.get();

// sanity-check that planning made some progress.
BOOST_REQUIRE(successes > 0);
BOOST_REQUIRE(failures > 0);
BOOST_REQUIRE(reassignments > 0);
}
38 changes: 31 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) {

_updates_in_progress.erase(it);

_topics_map_revision++;

on_partition_move_finish(cmd.key, cmd.value);

// notify backend about finished update
Expand Down Expand Up @@ -416,6 +418,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) {
current_assignment_it->replicas
= in_progress_it->second.get_previous_replicas();

_topics_map_revision++;

_pending_deltas.emplace_back(
std::move(cmd.key),
current_assignment_it->group,
Expand Down Expand Up @@ -459,6 +463,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
co_return errc::no_update_in_progress;
}

auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}

// revert replica set update
current_assignment_it->replicas
= in_progress_it->second.get_target_replicas();
Expand All @@ -469,11 +478,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
current_assignment_it->replicas,
};

// update partition_meta object
auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}
// update partition_meta object:
// the cancellation was reverted and update went through, we must
// update replicas_revisions.
p_meta_it->second.replicas_revisions = update_replicas_revisions(
Expand All @@ -485,6 +490,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
/// Since the update is already finished we drop in_progress state
_updates_in_progress.erase(in_progress_it);

_topics_map_revision++;

// notify backend about finished update
_pending_deltas.emplace_back(
ntp,
Expand Down Expand Up @@ -670,6 +677,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) {
}
}

_topics_map_revision++;
notify_waiters();

co_return errc::success;
Expand Down Expand Up @@ -998,6 +1006,7 @@ class topic_table::snapshot_applier {
disabled_partitions_t& _disabled_partitions;
fragmented_vector<delta>& _pending_deltas;
topic_table_probe& _probe;
model::revision_id& _topics_map_revision;
model::revision_id _snap_revision;

public:
Expand All @@ -1006,14 +1015,17 @@ class topic_table::snapshot_applier {
, _disabled_partitions(parent._disabled_partitions)
, _pending_deltas(parent._pending_deltas)
, _probe(parent._probe)
, _topics_map_revision(parent._topics_map_revision)
, _snap_revision(snap_revision) {}

void delete_ntp(
const model::topic_namespace& ns_tp, const partition_assignment& p_as) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id);
vlog(
clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp);
_updates_in_progress.erase(ntp);
if (_updates_in_progress.erase(ntp)) {
_topics_map_revision++;
};

_pending_deltas.emplace_back(
std::move(ntp),
Expand All @@ -1035,7 +1047,9 @@ class topic_table::snapshot_applier {
delete_ntp(ns_tp, p_as);
co_await ss::coroutine::maybe_yield();
}
_disabled_partitions.erase(ns_tp);
if (_disabled_partitions.erase(ns_tp)) {
_topics_map_revision++;
};
_probe.handle_topic_deletion(ns_tp);
// topic_metadata_item object is supposed to be removed from _topics by
// the caller
Expand All @@ -1050,6 +1064,9 @@ class topic_table::snapshot_applier {
vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp);
size_t pending_deltas_start_idx = _pending_deltas.size();

// we are going to modify md_item so increment the revision right away.
_topics_map_revision++;

const model::partition_id p_id = ntp.tp.partition;

// 1. reconcile the _topics state (the md_item object) and generate
Expand Down Expand Up @@ -1191,7 +1208,9 @@ class topic_table::snapshot_applier {
topic_metadata_item ret{topic_metadata{topic.metadata, {}}};
if (topic.disabled_set) {
_disabled_partitions[ns_tp] = *topic.disabled_set;
_topics_map_revision++;
}

for (const auto& [p_id, partition] : topic.partitions) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id);
add_ntp(ntp, topic, partition, ret, false);
Expand Down Expand Up @@ -1230,6 +1249,7 @@ ss::future<> topic_table::apply_snapshot(
// The topic was re-created, delete and add it anew.
co_await applier.delete_topic(ns_tp, md_item);
md_item = co_await applier.create_topic(ns_tp, topic_snapshot);
_topics_map_revision++;
} else {
// The topic was present in the previous set, now we need to
// reconcile individual partitions.
Expand All @@ -1247,10 +1267,12 @@ ss::future<> topic_table::apply_snapshot(
old_disabled_set = std::exchange(
_disabled_partitions[ns_tp],
*topic_snapshot.disabled_set);
_topics_map_revision++;
} else if (auto it = _disabled_partitions.find(ns_tp);
it != _disabled_partitions.end()) {
old_disabled_set = std::move(it->second);
_disabled_partitions.erase(it);
_topics_map_revision++;
}

// 2. For each partition in the new set, reconcile assignments
Expand Down Expand Up @@ -1288,6 +1310,7 @@ ss::future<> topic_table::apply_snapshot(
if (!topic_snapshot.partitions.contains(as_it_copy->id)) {
applier.delete_ntp(ns_tp, *as_it_copy);
md_item.get_assignments().erase(as_it_copy);
_topics_map_revision++;
}
co_await ss::coroutine::maybe_yield();
}
Expand Down Expand Up @@ -1633,6 +1656,7 @@ void topic_table::change_partition_replicas(
auto previous_assignment = current_assignment.replicas;
// replace partition replica set
current_assignment.replicas = new_assignment;
_topics_map_revision++;

// calculate delta for backend

Expand Down

0 comments on commit 32ad2d8

Please sign in to comment.