diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 660cfd61e405..8e382c91313c 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -16,44 +16,76 @@ use std::sync::Arc; use object_store::{util, ObjectStore}; use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; use crate::error::{DeleteSstSnafu, Result}; -use crate::sst::file::FileId; +use crate::read::Source; +use crate::sst::file::{FileHandle, FileId}; +use crate::sst::parquet::reader::ParquetReaderBuilder; +use crate::sst::parquet::writer::ParquetWriter; pub type AccessLayerRef = Arc; -/// Sst access layer. +/// A layer to access SST files under the same directory. pub struct AccessLayer { - sst_dir: String, + region_dir: String, object_store: ObjectStore, } impl std::fmt::Debug for AccessLayer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("AccessLayer") - .field("sst_dir", &self.sst_dir) + .field("region_dir", &self.region_dir) .finish() } } impl AccessLayer { - pub fn new(sst_dir: &str, object_store: ObjectStore) -> AccessLayer { + /// Returns a new [AccessLayer] for specific `region_dir`. + pub fn new(region_dir: impl Into, object_store: ObjectStore) -> AccessLayer { AccessLayer { - sst_dir: sst_dir.to_string(), + region_dir: region_dir.into(), object_store, } } - fn sst_file_path(&self, file_name: &str) -> String { - util::join_path(&self.sst_dir, file_name) + /// Returns the directory of the region. + pub fn region_dir(&self) -> &str { + &self.region_dir + } + + /// Returns the object store of the layer. + pub fn object_store(&self) -> &ObjectStore { + &self.object_store } /// Deletes a SST file with given file id. - pub async fn delete_sst(&self, file_id: FileId) -> Result<()> { + pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> { let path = self.sst_file_path(&file_id.as_parquet()); self.object_store .delete(&path) .await .context(DeleteSstSnafu { file_id }) } + + /// Returns a reader builder for specific `file`. + pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder { + ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone()) + } + + /// Returns a new parquet writer to write the SST for specific `file_id`. + pub(crate) fn write_sst( + &self, + file_id: FileId, + metadata: RegionMetadataRef, + source: Source, + ) -> ParquetWriter { + let path = self.sst_file_path(&file_id.as_parquet()); + ParquetWriter::new(path, metadata, source, self.object_store.clone()) + } + + /// Returns the `file_path` for the `file_name` in the object store. + fn sst_file_path(&self, file_name: &str) -> String { + util::join_path(&self.region_dir, file_name) + } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index f1016f33ff95..8371aac90dad 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -20,6 +20,8 @@ use common_telemetry::warn; /// Default region worker num. const DEFAULT_NUM_WORKERS: usize = 1; +/// Default max running background job. +const DEFAULT_MAX_BG_JOB: usize = 4; /// Default region write buffer size. pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32); @@ -40,6 +42,10 @@ pub struct MitoConfig { pub manifest_checkpoint_distance: u64, /// Manifest compression type (default uncompressed). pub manifest_compress_type: CompressionType, + + // Background job configs: + /// Max number of running background jobs. + pub max_background_jobs: usize, } impl Default for MitoConfig { @@ -50,6 +56,7 @@ impl Default for MitoConfig { worker_request_batch_size: 64, manifest_checkpoint_distance: 10, manifest_compress_type: CompressionType::Uncompressed, + max_background_jobs: DEFAULT_MAX_BG_JOB, } } } @@ -75,5 +82,10 @@ impl MitoConfig { warn!("Sanitize channel size 0 to 1"); self.worker_channel_size = 1; } + + if self.max_background_jobs == 0 { + warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB); + self.max_background_jobs = DEFAULT_MAX_BG_JOB; + } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index b41fdcc75e09..503433e36f4c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -33,7 +33,6 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; -use crate::flush::WriteBufferManagerImpl; use crate::read::scan_region::{ScanRegion, Scanner}; use crate::request::WorkerRequest; use crate::worker::WorkerGroup; @@ -106,15 +105,8 @@ impl EngineInner { log_store: Arc, object_store: ObjectStore, ) -> EngineInner { - let write_buffer_manager = Arc::new(WriteBufferManagerImpl {}); - EngineInner { - workers: WorkerGroup::start( - config, - log_store, - object_store.clone(), - write_buffer_manager, - ), + workers: WorkerGroup::start(config, log_store, object_store.clone()), object_store, } } @@ -152,12 +144,7 @@ impl EngineInner { .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; let version = region.version(); - let scan_region = ScanRegion::new( - version, - region.region_dir.clone(), - self.object_store.clone(), - request, - ); + let scan_region = ScanRegion::new(version, region.access_layer.clone(), request); scan_region.scanner() } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index e56829b3b0c6..50a1f19e94ef 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -14,7 +14,7 @@ //! Flush related utilities and structs. -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::sync::Arc; use store_api::storage::RegionId; @@ -23,8 +23,7 @@ use tokio::sync::oneshot::Sender; use crate::error::Result; use crate::region::MitoRegionRef; use crate::request::{SenderDdlRequest, SenderWriteRequest}; - -const FLUSH_JOB_LIMIT: usize = 4; +use crate::schedule::scheduler::SchedulerRef; /// Global write buffer (memtable) manager. /// @@ -119,27 +118,21 @@ impl RegionFlushTask { /// Manages background flushes of a worker. pub(crate) struct FlushScheduler { - /// Pending flush tasks. - queue: VecDeque, + /// Tracks regions need to flush. region_status: HashMap, - /// Number of running flush jobs. - num_flush_running: usize, - /// Max number of background flush jobs. - job_limit: usize, + /// Background job scheduler. + scheduler: SchedulerRef, } -impl Default for FlushScheduler { - fn default() -> Self { +impl FlushScheduler { + /// Creates a new flush scheduler. + pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler { FlushScheduler { - queue: VecDeque::new(), region_status: HashMap::new(), - num_flush_running: 0, - job_limit: FLUSH_JOB_LIMIT, + scheduler, } } -} -impl FlushScheduler { /// Returns true if the region is stalling. pub(crate) fn is_stalling(&self, region_id: RegionId) -> bool { if let Some(status) = self.region_status.get(®ion_id) { @@ -170,21 +163,9 @@ impl FlushScheduler { if flush_status.flushing_task.is_some() { // There is already a flush job running. flush_status.stalling = true; - self.queue.push_back(task); return; } - // Checks flush job limit. - debug_assert!(self.num_flush_running <= self.job_limit); - if !self.queue.is_empty() || self.num_flush_running >= self.job_limit { - debug_assert!(self.num_flush_running == self.job_limit); - // We reach job limit. - self.queue.push_back(task); - return; - } - - // TODO(yingwen): Submit the flush job to job scheduler. - todo!() } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 87322aa24887..acf0fea14023 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -43,7 +43,7 @@ pub mod request; #[allow(dead_code)] mod row_converter; #[allow(dead_code)] -mod schedule; +pub(crate) mod schedule; #[allow(dead_code)] pub mod sst; pub mod wal; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index db6414bf8dd6..5a05373faf5b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -17,11 +17,11 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::range::TimestampRange; -use object_store::ObjectStore; use snafu::ResultExt; use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; +use crate::access_layer::AccessLayerRef; use crate::error::{BuildPredicateSnafu, Result}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; @@ -85,10 +85,8 @@ impl Scanner { pub(crate) struct ScanRegion { /// Version of the region at scan. version: VersionRef, - /// Directory of SST files. - file_dir: String, - /// Object store that stores SST files. - object_store: ObjectStore, + /// Access layer of the region. + access_layer: AccessLayerRef, /// Scan request. request: ScanRequest, } @@ -97,14 +95,12 @@ impl ScanRegion { /// Creates a [ScanRegion]. pub(crate) fn new( version: VersionRef, - file_dir: String, - object_store: ObjectStore, + access_layer: AccessLayerRef, request: ScanRequest, ) -> ScanRegion { ScanRegion { version, - file_dir, - object_store, + access_layer, request, } } @@ -152,7 +148,7 @@ impl ScanRegion { None => ProjectionMapper::all(&self.version.metadata)?, }; - let seq_scan = SeqScan::new(self.file_dir, self.object_store, mapper, self.request) + let seq_scan = SeqScan::new(self.access_layer.clone(), mapper, self.request) .with_time_range(Some(time_range)) .with_predicate(Some(predicate)) .with_memtables(memtables) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 6ac8e37d3ec0..e2c42ab91857 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -21,27 +21,24 @@ use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use common_time::range::TimestampRange; -use object_store::ObjectStore; use snafu::ResultExt; use store_api::storage::ScanRequest; use table::predicate::Predicate; +use crate::access_layer::AccessLayerRef; use crate::error::Result; use crate::memtable::MemtableRef; use crate::read::merge::MergeReaderBuilder; use crate::read::projection::ProjectionMapper; use crate::read::BatchReader; use crate::sst::file::FileHandle; -use crate::sst::parquet::reader::ParquetReaderBuilder; /// Scans a region and returns rows in a sorted sequence. /// /// The output order is always `order by primary key, time index`. pub struct SeqScan { - /// Directory of SST files. - file_dir: String, - /// Object store that stores SST files. - object_store: ObjectStore, + /// Region SST access layer. + access_layer: AccessLayerRef, /// Maps projected Batches to RecordBatches. mapper: Arc, /// Original scan request to scan memtable. @@ -62,14 +59,12 @@ impl SeqScan { /// Creates a new [SeqScan]. #[must_use] pub(crate) fn new( - file_dir: String, - object_store: ObjectStore, + access_layer: AccessLayerRef, mapper: ProjectionMapper, request: ScanRequest, ) -> SeqScan { SeqScan { - file_dir, - object_store, + access_layer, mapper: Arc::new(mapper), time_range: None, predicate: None, @@ -116,16 +111,14 @@ impl SeqScan { builder.push_batch_iter(iter); } for file in &self.files { - let reader = ParquetReaderBuilder::new( - self.file_dir.clone(), - file.clone(), - self.object_store.clone(), - ) - .predicate(self.predicate.clone()) - .time_range(self.time_range) - .projection(Some(self.mapper.column_ids().to_vec())) - .build() - .await?; + let reader = self + .access_layer + .read_sst(file.clone()) + .predicate(self.predicate.clone()) + .time_range(self.time_range) + .projection(Some(self.mapper.column_ids().to_vec())) + .build() + .await?; builder.push_batch_reader(Box::new(reader)); } let mut reader = builder.build().await?; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 4d240a935d3a..0b7edbd37b2a 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -18,15 +18,18 @@ pub(crate) mod opener; pub(crate) mod version; use std::collections::HashMap; +use std::sync::atomic::AtomicI64; use std::sync::{Arc, RwLock}; use common_telemetry::info; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; +use crate::access_layer::AccessLayerRef; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; use crate::region::version::{VersionControlRef, VersionRef}; +use crate::sst::file_purger::FilePurgerRef; /// Type to store region version. pub type VersionNumber = u32; @@ -46,10 +49,14 @@ pub(crate) struct MitoRegion { /// Version controller for this region. pub(crate) version_control: VersionControlRef, - /// Data directory of the region. - pub(crate) region_dir: String, + /// SSTs accessor for this region. + pub(crate) access_layer: AccessLayerRef, /// Manager to maintain manifest for this region. - manifest_manager: RegionManifestManager, + pub(crate) manifest_manager: RegionManifestManager, + /// SST file purger. + pub(crate) file_purger: FilePurgerRef, + /// Last flush time in millis. + last_flush_millis: AtomicI64, } pub(crate) type MitoRegionRef = Arc; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 64187f9eea91..7fc33f759135 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -14,9 +14,11 @@ //! Region opener. +use std::sync::atomic::AtomicI64; use std::sync::Arc; use common_telemetry::info; +use common_time::util::current_time_millis; use futures::StreamExt; use object_store::util::join_dir; use object_store::ObjectStore; @@ -25,6 +27,7 @@ use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; +use crate::access_layer::AccessLayer; use crate::config::MitoConfig; use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; @@ -32,6 +35,8 @@ use crate::memtable::MemtableBuilderRef; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::MitoRegion; use crate::region_write_ctx::RegionWriteCtx; +use crate::schedule::scheduler::SchedulerRef; +use crate::sst::file_purger::LocalFilePurger; use crate::wal::{EntryId, Wal}; /// Builder to create a new [MitoRegion] or open an existing one. @@ -41,6 +46,7 @@ pub(crate) struct RegionOpener { memtable_builder: MemtableBuilderRef, object_store: ObjectStore, region_dir: String, + scheduler: SchedulerRef, } impl RegionOpener { @@ -49,6 +55,7 @@ impl RegionOpener { region_id: RegionId, memtable_builder: MemtableBuilderRef, object_store: ObjectStore, + scheduler: SchedulerRef, ) -> RegionOpener { RegionOpener { region_id, @@ -56,6 +63,7 @@ impl RegionOpener { memtable_builder, object_store, region_dir: String::new(), + scheduler, } } @@ -82,7 +90,7 @@ impl RegionOpener { // Create a manifest manager for this region. let options = RegionManifestOptions { manifest_dir: new_manifest_dir(&self.region_dir), - object_store: self.object_store, + object_store: self.object_store.clone(), compress_type: config.manifest_compress_type, checkpoint_distance: config.manifest_checkpoint_distance, }; @@ -93,12 +101,15 @@ impl RegionOpener { let version = VersionBuilder::new(metadata, mutable).build(); let version_control = Arc::new(VersionControl::new(version)); + let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); Ok(MitoRegion { region_id, version_control, - region_dir: self.region_dir, + access_layer: access_layer.clone(), manifest_manager, + file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)), + last_flush_millis: AtomicI64::new(current_time_millis()), }) } @@ -112,7 +123,7 @@ impl RegionOpener { ) -> Result { let options = RegionManifestOptions { manifest_dir: new_manifest_dir(&self.region_dir), - object_store: self.object_store, + object_store: self.object_store.clone(), compress_type: config.manifest_compress_type, checkpoint_distance: config.manifest_checkpoint_distance, }; @@ -140,12 +151,15 @@ impl RegionOpener { let flushed_sequence = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); replay_memtable(wal, region_id, flushed_sequence, &version_control).await?; + let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); let region = MitoRegion { region_id: self.region_id, version_control, - region_dir: self.region_dir, + access_layer: access_layer.clone(), manifest_manager, + file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)), + last_flush_millis: AtomicI64::new(current_time_millis()), }; Ok(region) } diff --git a/src/mito2/src/schedule/scheduler.rs b/src/mito2/src/schedule/scheduler.rs index 8ee896b5a8c0..27de01c6e07d 100644 --- a/src/mito2/src/schedule/scheduler.rs +++ b/src/mito2/src/schedule/scheduler.rs @@ -36,7 +36,7 @@ const STATE_AWAIT_TERMINATION: u8 = 2; /// [Scheduler] defines a set of API to schedule Jobs #[async_trait::async_trait] -pub trait Scheduler { +pub trait Scheduler: Send + Sync { /// Schedules a Job fn schedule(&self, job: Job) -> Result<()>; @@ -44,6 +44,8 @@ pub trait Scheduler { async fn stop(&self, await_termination: bool) -> Result<()>; } +pub type SchedulerRef = Arc; + /// Request scheduler based on local state. pub struct LocalScheduler { /// Sends jobs to flume bounded channel @@ -57,7 +59,8 @@ pub struct LocalScheduler { } impl LocalScheduler { - /// cap: flume bounded cap + /// Starts a new scheduler. + /// /// concurrency: the number of bounded receiver pub fn new(concurrency: usize) -> Self { let (tx, rx) = flume::unbounded(); @@ -153,7 +156,11 @@ impl Scheduler for LocalScheduler { impl Drop for LocalScheduler { fn drop(&mut self) { if self.state.load(Ordering::Relaxed) != STATE_STOP { - logging::error!("scheduler must be stopped before dropping, which means the state of scheduler must be STATE_STOP"); + logging::warn!("scheduler should be stopped before dropping, which means the state of scheduler must be STATE_STOP"); + + // We didn't call `stop()` so we cancel all background workers here. + self.sender.write().unwrap().take(); + self.cancel_token.cancel(); } } } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 9b289f80fd5f..ace502f4a659 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::sync::Arc; use common_telemetry::{error, info}; use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; -use crate::schedule::scheduler::{LocalScheduler, Scheduler}; +use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::FileId; /// Request to remove a file. @@ -31,7 +32,7 @@ pub struct PurgeRequest { } /// A worker to delete files in background. -pub trait FilePurger: Send + Sync { +pub trait FilePurger: Send + Sync + fmt::Debug { /// Send a purge request to the background worker. fn send_request(&self, request: PurgeRequest); } @@ -39,13 +40,21 @@ pub trait FilePurger: Send + Sync { pub type FilePurgerRef = Arc; pub struct LocalFilePurger { - scheduler: Arc, + scheduler: SchedulerRef, sst_layer: AccessLayerRef, } +impl fmt::Debug for LocalFilePurger { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalFilePurger") + .field("sst_layer", &self.sst_layer) + .finish() + } +} + impl LocalFilePurger { - pub fn new(scheduler: Arc, sst_layer: AccessLayerRef) -> Self { + pub fn new(scheduler: SchedulerRef, sst_layer: AccessLayerRef) -> Self { Self { scheduler, sst_layer, @@ -84,7 +93,7 @@ mod tests { use super::*; use crate::access_layer::AccessLayer; - use crate::schedule::scheduler::LocalScheduler; + use crate::schedule::scheduler::{LocalScheduler, Scheduler}; use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange}; #[tokio::test] diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index a286f336f27b..f5f9c04571f2 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -31,9 +31,9 @@ use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::stream_writer::BufferedWriter; /// Parquet SST writer. -pub struct ParquetWriter<'a> { +pub struct ParquetWriter { /// SST output file path. - file_path: &'a str, + file_path: String, /// Input data source. source: Source, /// Region metadata of the source and the target SST. @@ -41,10 +41,10 @@ pub struct ParquetWriter<'a> { object_store: ObjectStore, } -impl<'a> ParquetWriter<'a> { +impl ParquetWriter { /// Creates a new parquet SST writer. pub fn new( - file_path: &'a str, + file_path: String, metadata: RegionMetadataRef, source: Source, object_store: ObjectStore, @@ -87,7 +87,7 @@ impl<'a> ParquetWriter<'a> { let write_format = WriteFormat::new(self.metadata.clone()); let mut buffered_writer = BufferedWriter::try_new( - self.file_path.to_string(), + self.file_path.clone(), self.object_store.clone(), write_format.arrow_schema(), Some(writer_props), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 285c5096add2..37d3f8410d25 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -35,7 +35,6 @@ use store_api::region_request::RegionCreateRequest; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::error::Result; -use crate::flush::WriteBufferManagerImpl; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; use crate::worker::WorkerGroup; @@ -95,12 +94,7 @@ impl TestEnv { pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store) = self.create_log_and_object_store().await; - WorkerGroup::start( - config, - Arc::new(log_store), - object_store, - Arc::new(WriteBufferManagerImpl {}), - ) + WorkerGroup::start(config, Arc::new(log_store), object_store) } async fn create_log_and_object_store(&self) -> (RaftEngineLogStore, ObjectStore) { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 283cff6773ed..a829624bfa2e 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -37,11 +37,12 @@ use tokio::sync::{mpsc, Mutex}; use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; -use crate::flush::{FlushScheduler, WriteBufferManagerRef}; +use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::memtable::MemtableBuilderRef; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{BackgroundNotify, DdlRequest, SenderDdlRequest, WorkerRequest}; +use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::wal::Wal; /// Identifier for a worker. @@ -84,9 +85,9 @@ pub(crate) type WorkerId = u32; /// Chan0 --> Buffer0 /// Chan1 --> WorkerThread1 /// ``` -#[derive(Debug)] pub(crate) struct WorkerGroup { workers: Vec, + scheduler: SchedulerRef, } impl WorkerGroup { @@ -97,24 +98,27 @@ impl WorkerGroup { config: MitoConfig, log_store: Arc, object_store: ObjectStore, - write_buffer_manager: WriteBufferManagerRef, ) -> WorkerGroup { assert!(config.num_workers.is_power_of_two()); let config = Arc::new(config); + let write_buffer_manager = Arc::new(WriteBufferManagerImpl {}); + let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); let workers = (0..config.num_workers) .map(|id| { - RegionWorker::start( - id as WorkerId, - config.clone(), - log_store.clone(), - object_store.clone(), - write_buffer_manager.clone(), - ) + WorkerStarter { + id: id as WorkerId, + config: config.clone(), + log_store: log_store.clone(), + object_store: object_store.clone(), + write_buffer_manager: write_buffer_manager.clone(), + scheduler: scheduler.clone(), + } + .start() }) .collect(); - WorkerGroup { workers } + WorkerGroup { workers, scheduler } } /// Stop the worker group. @@ -123,6 +127,8 @@ impl WorkerGroup { try_join_all(self.workers.iter().map(|worker| worker.stop())).await?; + self.scheduler.stop(true).await?; + Ok(()) } @@ -162,59 +168,65 @@ fn value_to_index(value: usize, num_workers: usize) -> usize { value & (num_workers - 1) } -/// Worker to write and alter regions bound to it. -#[derive(Debug)] -pub(crate) struct RegionWorker { - /// Id of the worker. +/// Worker start config. +struct WorkerStarter { id: WorkerId, - /// Regions bound to the worker. - regions: RegionMapRef, - /// Request sender. - sender: Sender, - /// Handle to the worker thread. - handle: Mutex>>, - /// Whether to run the worker thread. - running: Arc, + config: Arc, + log_store: Arc, + object_store: ObjectStore, + write_buffer_manager: WriteBufferManagerRef, + scheduler: SchedulerRef, } -impl RegionWorker { +impl WorkerStarter { /// Start a region worker and its background thread. - fn start( - id: WorkerId, - config: Arc, - log_store: Arc, - object_store: ObjectStore, - write_buffer_manager: WriteBufferManagerRef, - ) -> RegionWorker { + fn start(self) -> RegionWorker { let regions = Arc::new(RegionMap::default()); - let (sender, receiver) = mpsc::channel(config.worker_channel_size); + let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); let mut worker_thread = RegionWorkerLoop { - id, - config, + id: self.id, + config: self.config, regions: regions.clone(), receiver, - wal: Wal::new(log_store), - object_store, + wal: Wal::new(self.log_store), + object_store: self.object_store, running: running.clone(), memtable_builder: Arc::new(TimeSeriesMemtableBuilder::default()), - write_buffer_manager, - flush_scheduler: FlushScheduler::default(), + scheduler: self.scheduler.clone(), + write_buffer_manager: self.write_buffer_manager, + flush_scheduler: FlushScheduler::new(self.scheduler), }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; }); RegionWorker { - id, + id: self.id, regions, sender, handle: Mutex::new(Some(handle)), running, } } +} + +/// Worker to write and alter regions bound to it. +pub(crate) struct RegionWorker { + /// Id of the worker. + id: WorkerId, + /// Regions bound to the worker. + regions: RegionMapRef, + /// Request sender. + sender: Sender, + /// Handle to the worker thread. + handle: Mutex>>, + /// Whether to run the worker thread. + running: Arc, +} +impl RegionWorker { /// Submit request to background worker thread. async fn submit_request(&self, request: WorkerRequest) -> Result<()> { ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id }); @@ -300,7 +312,8 @@ struct RegionWorkerLoop { running: Arc, /// Memtable builder for each region. memtable_builder: MemtableBuilderRef, - + /// Background job scheduler. + scheduler: SchedulerRef, /// Engine write buffer manager. write_buffer_manager: WriteBufferManagerRef, /// Schedules background flush requests. diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 3c11fe1cbc13..31fee02008ed 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -58,6 +58,7 @@ impl RegionWorkerLoop { region_id, self.memtable_builder.clone(), self.object_store.clone(), + self.scheduler.clone(), ) .metadata(metadata) .region_dir(&request.region_dir) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index c6b6c73fecb3..64f50788d861 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -43,6 +43,7 @@ impl RegionWorkerLoop { region_id, self.memtable_builder.clone(), self.object_store.clone(), + self.scheduler.clone(), ) .region_dir(&request.region_dir) .open(&self.config, &self.wal)