Skip to content

Commit

Permalink
repair: Reduce repair reader eviction with diff shard count
Browse files Browse the repository at this point in the history
When repair master and followers have different shard count, the repair
followers need to create multi-shard readers. Each multi-shard reader
will create one local reader on each shard, N (smp::count) local readers
in total.

There is a hard limit on the number of readers who can work in parallel.
When there are more readers than this limit. The readers will start to
evict each other, causing buffers already read from disk to be dropped
and recreating of readers, which is not very efficient.

To optimize and reduce reader eviction overhead, a global reader permit
is introduced which considers the multi-shard reader bloats.

With this patch, at any point in time, the number of readers created by
repair will not exceed the reader limit.

Test Results:

1) with stream sem 10, repair global sem 10, 5 ranges in parallel, n1=2
shards, n2=8 shards, memory wanted =1

1.1)
[asias@hjpc2 mycluster]$ time nodetool -p 7200 repair ks2  (repair on n2)
[2022-11-23 17:45:24,770] Starting repair command #1, repairing 1
ranges for keyspace ks2 (parallelism=SEQUENTIAL, full=true)
[2022-11-23 17:45:53,869] Repair session 1
[2022-11-23 17:45:53,869] Repair session 1 finished

real    0m30.212s
user    0m1.680s
sys     0m0.222s

1.2)
[asias@hjpc2 mycluster]$ time nodetool  repair ks2  (repair on n1)
[2022-11-23 17:46:07,507] Starting repair command #1, repairing 1
ranges for keyspace ks2 (parallelism=SEQUENTIAL, full=true)
[2022-11-23 17:46:30,608] Repair session 1
[2022-11-23 17:46:30,608] Repair session 1 finished

real    0m24.241s
user    0m1.731s
sys     0m0.213s

2) with stream sem 10, repair global sem no_limit, 5 ranges in
parallel, n1=2 shards, n2=8 shards, memory wanted =1

2.1)
[asias@hjpc2 mycluster]$ time nodetool -p 7200 repair ks2 (repair on n2)
[2022-11-23 17:49:49,301] Starting repair command #1, repairing 1
ranges for keyspace ks2 (parallelism=SEQUENTIAL, full=true)
[2022-11-23 17:52:01,414] Repair session 1
[2022-11-23 17:52:01,415] Repair session 1 finished

real    2m13.227s
user    0m1.752s
sys     0m0.218s

2.2)
[asias@hjpc2 mycluster]$ time nodetool  repair ks2 (repair on n1)
[2022-11-23 17:52:19,280] Starting repair command #1, repairing 1
ranges for keyspace ks2 (parallelism=SEQUENTIAL, full=true)
[2022-11-23 17:52:42,387] Repair session 1
[2022-11-23 17:52:42,387] Repair session 1 finished

real    0m24.196s
user    0m1.689s
sys     0m0.184s

Comparing 1.1) and 2.1), it shows the eviction played a major role here.
The patch gives 73s / 30s = 2.5X speed up in this setup.

Comparing 1.1 and 1.2, it shows even if we limit the readers, starting
on the lower shard is faster 30s / 24s = 1.25X (the total number of
multishard readers is lower)

Fixes scylladb#12157

Closes scylladb#12158
  • Loading branch information
asias authored and denesb committed Dec 5, 2022
1 parent 1e20095 commit c6087cf
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
44 changes: 43 additions & 1 deletion repair/row_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,36 @@ void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_
}
}

// For local reader: a permit is taken on the local shard.
// For multi-shard reader: a permit is taken on each shard, smp::count permits
// are taken in total.
struct repair_reader_permit_meta {
std::vector<foreign_ptr<lw_shared_ptr<semaphore_units<>>>> permits{smp::count};
};

// If all_shards is set to true, permits on each shard are taken.
// If all_shards is set to false, a single permit is taken from the specified shard.
future<repair_reader_permit_meta> get_global_reader_permit(repair_service& rs, unsigned shard, bool all_shards) {
repair_reader_permit_meta meta;
// We need to serialize the process of taking permits. So the code to take
// the permits are performed on a single shard. The last shard is chosen as
// the coordinator shard.
co_await rs.container().invoke_on(smp::count -1, [&meta, shard, all_shards] (repair_service& rs) -> future<> {
co_await with_semaphore(rs.lock_sem(), 1, [&rs, &meta, shard, all_shards] () -> future<> {
co_await rs.container().invoke_on_all([&meta, shard, all_shards] (repair_service& rs) -> future<> {
if (all_shards || shard == this_shard_id()) {
auto& reader_sem = rs.reader_sem();
auto permit = co_await seastar::get_units(reader_sem, 1);
auto ptr = make_lw_shared<semaphore_units<>>(std::move(permit));
meta.permits[this_shard_id()] = make_foreign(std::move(ptr));
}
co_return;
});
});
});
co_return meta;
};

class repair_meta {
friend repair_meta_tracker;
public:
Expand Down Expand Up @@ -714,6 +744,7 @@ class repair_meta {
// follower nr peers is always one because repair master is the only peer.
size_t _nr_peer_nodes= 1;
repair_stats _stats;
bool _is_local_reader;
repair_reader _repair_reader;
lw_shared_ptr<repair_writer> _repair_writer;
// Contains rows read from disk
Expand All @@ -739,6 +770,7 @@ class repair_meta {
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
std::optional<shared_future<>> _stopped;
repair_hasher _repair_hasher;
std::optional<repair_reader_permit_meta> _reader_permit;
public:
std::vector<repair_node_state>& all_nodes() {
return _all_node_states;
Expand Down Expand Up @@ -814,6 +846,7 @@ class repair_meta {
, _remote_sharder(make_remote_sharder())
, _same_sharding_config(is_same_sharding_config())
, _nr_peer_nodes(nr_peer_nodes)
, _is_local_reader(_repair_master || _same_sharding_config)
, _repair_reader(
_db,
_cf,
Expand Down Expand Up @@ -1050,7 +1083,12 @@ class repair_meta {
future<std::tuple<std::list<repair_row>, size_t>>
read_rows_from_disk(size_t cur_size) {
using value_type = std::tuple<std::list<repair_row>, size_t>;
return do_with(cur_size, size_t(0), std::list<repair_row>(), [this] (size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
if (!_reader_permit) {
bool all_shards = !_is_local_reader;
auto permit = co_await get_global_reader_permit(_rs, this_shard_id(), all_shards);
_reader_permit = std::optional<repair_reader_permit_meta>(std::move(permit));
}
auto ret = co_await do_with(cur_size, size_t(0), std::list<repair_row>(), [this] (size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
return repeat([this, &cur_size, &cur_rows, &new_rows_size] () mutable {
if (cur_size >= _max_row_buf_size) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
Expand All @@ -1074,6 +1112,7 @@ class repair_meta {
return make_ready_future<value_type>(value_type(std::move(cur_rows), new_rows_size));
});
});
co_return ret;
}

future<> clear_row_buf() {
Expand Down Expand Up @@ -2934,6 +2973,9 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
, _node_ops_metrics(_repair_module)
, _max_repair_memory(max_repair_memory)
, _memory_sem(max_repair_memory)
// The "10" below should be the same mas max_count_streaming_concurrent_reads.
// FIXME: use that named constant instead of the number here.
, _reader_sem(10)
{
tm.register_module("repair", _repair_module);
if (this_shard_id() == 0) {
Expand Down
4 changes: 4 additions & 0 deletions repair/row_level.hh
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {

size_t _max_repair_memory;
seastar::semaphore _memory_sem;
seastar::semaphore _reader_sem;
seastar::semaphore _lock_sem{1};

future<> init_ms_handlers();
future<> uninit_ms_handlers();
Expand Down Expand Up @@ -172,6 +174,8 @@ public:
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
size_t max_repair_memory() const { return _max_repair_memory; }
seastar::semaphore& memory_sem() { return _memory_sem; }
seastar::semaphore& reader_sem() { return _reader_sem; }
seastar::semaphore& lock_sem() { return _lock_sem; }
repair_module& get_repair_module() noexcept {
return *_repair_module;
}
Expand Down

0 comments on commit c6087cf

Please sign in to comment.