Skip to content

Commit

Permalink
server: make gc support multi-threads (#16096) (#16588)
Browse files Browse the repository at this point in the history
close #16101

do parallel region gc and expose the gc thread configuration. The configuration can be dynamically updated.

Signed-off-by: Qi Xu <[email protected]>

Co-authored-by: Qi Xu <[email protected]>
Co-authored-by: tonyxuqqi <[email protected]>
  • Loading branch information
3 people committed Mar 1, 2024
1 parent 3443254 commit d18dba7
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 56 deletions.
6 changes: 6 additions & 0 deletions src/server/gc_worker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct GcConfig {
/// greater than 5.0.0. Change `compaction_filter_skip_version_check`
/// can enable it by force.
pub compaction_filter_skip_version_check: bool,
/// gc threads count
pub num_threads: usize,
}

impl Default for GcConfig {
Expand All @@ -32,6 +34,7 @@ impl Default for GcConfig {
max_write_bytes_per_sec: ReadableSize(DEFAULT_GC_MAX_WRITE_BYTES_PER_SEC),
enable_compaction_filter: true,
compaction_filter_skip_version_check: false,
num_threads: 1,
}
}
}
Expand All @@ -41,6 +44,9 @@ impl GcConfig {
if self.batch_keys == 0 {
return Err("gc.batch_keys should not be 0".into());
}
if self.num_threads == 0 {
return Err("gc.thread_count should not be 0".into());
}
Ok(())
}
}
Expand Down
103 changes: 76 additions & 27 deletions src/server/gc_worker/gc_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
cmp::Ordering,
sync::{
atomic::{AtomicU64, Ordering as AtomicOrdering},
mpsc, Arc,
mpsc, Arc, Condvar, Mutex,
},
thread::{self, Builder as ThreadBuilder, JoinHandle},
time::Duration,
Expand All @@ -20,10 +20,10 @@ use txn_types::{Key, TimeStamp};
use super::{
compaction_filter::is_compaction_filter_allowed,
config::GcWorkerConfigManager,
gc_worker::{sync_gc, GcSafePointProvider, GcTask},
gc_worker::{schedule_gc, GcSafePointProvider, GcTask},
Result,
};
use crate::{server::metrics::*, tikv_util::sys::thread::StdThreadBuildWrapper};
use crate::{server::metrics::*, storage::Callback, tikv_util::sys::thread::StdThreadBuildWrapper};

const POLL_SAFE_POINT_INTERVAL_SECS: u64 = 10;

Expand Down Expand Up @@ -245,6 +245,8 @@ pub(super) struct GcManager<S: GcSafePointProvider, R: RegionInfoProvider, E: Kv

cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,

max_concurrent_tasks: usize,
}

impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcManager<S, R, E> {
Expand All @@ -254,6 +256,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
worker_scheduler: Scheduler<GcTask<E>>,
cfg_tracker: GcWorkerConfigManager,
feature_gate: FeatureGate,
concurrent_tasks: usize,
) -> GcManager<S, R, E> {
GcManager {
cfg,
Expand All @@ -263,6 +266,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
gc_manager_ctx: GcManagerContext::new(),
cfg_tracker,
feature_gate,
max_concurrent_tasks: concurrent_tasks,
}
}

Expand Down Expand Up @@ -443,13 +447,27 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
let mut progress = Some(Key::from_encoded(BEGIN_KEY.to_vec()));

// Records how many region we have GC-ed.
let mut processed_regions = 0;
let mut scheduled_regions = 0;
let task_controller = Arc::new((Mutex::new(0), Condvar::new()));
// the task_controller is the <mutex,Condvar> combination to control the number
// of tasks The mutex is used for protecting the number of current
// tasks, while the condvar is used for notifying/get notified when the
// number of current tasks is changed.
let (lock, cvar) = &*task_controller;
let maybe_wait = |max_tasks| {
let mut current_tasks: std::sync::MutexGuard<'_, usize> = lock.lock().unwrap();
while *current_tasks > max_tasks {
// Wait until the number of current tasks is below the limit
current_tasks = cvar.wait(current_tasks).unwrap();
}
};

info!("gc_worker: auto gc starts"; "safe_point" => self.curr_safe_point());

// The following loop iterates all regions whose leader is on this TiKV and does
// GC on them. At the same time, check whether safe_point is updated
// periodically. If it's updated, rewinding will happen.

loop {
self.gc_manager_ctx.check_stopped()?;
if is_compaction_filter_allowed(&self.cfg_tracker.value(), &self.feature_gate) {
Expand All @@ -463,9 +481,9 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
// We have worked to the end and we need to rewind. Restart from beginning.
progress = Some(Key::from_encoded(BEGIN_KEY.to_vec()));
need_rewind = false;
info!("gc_worker: auto gc rewinds"; "processed_regions" => processed_regions);
info!("gc_worker: auto gc rewinds"; "scheduled_regions" => scheduled_regions);

processed_regions = 0;
scheduled_regions = 0;
// Set the metric to zero to show that rewinding has happened.
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
Expand All @@ -484,19 +502,40 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
if finished {
// We have worked to the end of the TiKV or our progress has reached `end`, and
// we don't need to rewind. In this case, the round of GC has finished.
info!("gc_worker: auto gc finishes"; "processed_regions" => processed_regions);
return Ok(());
info!("gc_worker: all regions task are scheduled";
"processed_regions" => scheduled_regions,
);
break;
}
}

assert!(progress.is_some());

// Before doing GC, check whether safe_point is updated periodically to
// determine if rewinding is needed.
self.check_if_need_rewind(&progress, &mut need_rewind, &mut end);

progress = self.gc_next_region(progress.unwrap(), &mut processed_regions)?;
let controller: Arc<(Mutex<usize>, Condvar)> = Arc::clone(&task_controller);
let cb = Box::new(move |_res| {
let (lock, cvar) = &*controller;
let mut current_tasks = lock.lock().unwrap();
*current_tasks -= 1;
cvar.notify_one();
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.inc();
});
maybe_wait(self.max_concurrent_tasks - 1);
let mut current_tasks = lock.lock().unwrap();
progress = self.async_gc_next_region(progress.unwrap(), cb, &mut current_tasks)?;
scheduled_regions += 1;
}

// wait for all tasks finished
self.gc_manager_ctx.check_stopped()?;
maybe_wait(0);
info!("gc_worker: auto gc finishes"; "processed_regions" => scheduled_regions);

Ok(())
}

/// Checks whether we need to rewind in this round of GC. Only used in
Expand Down Expand Up @@ -537,13 +576,14 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
}
}

/// Does GC on the next region after `from_key`. Returns the end key of the
/// region it processed. If we have processed to the end of all regions,
/// returns `None`.
fn gc_next_region(
/// Does GC on the next region after `from_key` asynchronously. Returns the
/// end key of the region it processed. If we have processed to the end
/// of all regions, returns `None`.
fn async_gc_next_region(
&mut self,
from_key: Key,
processed_regions: &mut usize,
callback: Callback<()>,
running_tasks: &mut usize,
) -> GcManagerResult<Option<Key>> {
// Get the information of the next region to do GC.
let (region, next_key) = self.get_next_gc_context(from_key);
Expand All @@ -553,16 +593,16 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
let hex_end = format!("{:?}", log_wrappers::Value::key(region.get_end_key()));
debug!("trying gc"; "region_id" => region.id, "start_key" => &hex_start, "end_key" => &hex_end);

if let Err(e) = sync_gc(&self.worker_scheduler, region, self.curr_safe_point()) {
// Ignore the error and continue, since it's useless to retry this.
// TODO: Find a better way to handle errors. Maybe we should retry.
warn!("failed gc"; "start_key" => &hex_start, "end_key" => &hex_end, "err" => ?e);
}

*processed_regions += 1;
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.inc();
let _ = schedule_gc(
&self.worker_scheduler,
region,
self.curr_safe_point(),
callback,
)
.map(|_| {
*running_tasks += 1;
Ok::<(), GcManagerError>(())
});

Ok(next_key)
}
Expand Down Expand Up @@ -711,8 +751,16 @@ mod tests {
impl GcManagerTestUtil {
pub fn new(regions: BTreeMap<Vec<u8>, RegionInfo>) -> Self {
let (gc_task_sender, gc_task_receiver) = channel();
let worker = WorkerBuilder::new("test-gc-manager").create();
let scheduler = worker.start("gc-manager", MockGcRunner { tx: gc_task_sender });
let worker = WorkerBuilder::new("test-gc-manager")
.thread_count(2)
.create();
let scheduler = worker.start(
"gc-manager",
MockGcRunner {
tx: gc_task_sender.clone(),
},
);
worker.start("gc-manager", MockGcRunner { tx: gc_task_sender });

let (safe_point_sender, safe_point_receiver) = channel();

Expand All @@ -732,6 +780,7 @@ mod tests {
scheduler,
GcWorkerConfigManager::default(),
Default::default(),
2,
);
Self {
gc_manager: Some(gc_manager),
Expand Down

0 comments on commit d18dba7

Please sign in to comment.