Skip to content

Commit

Permalink
feat(mito): Integrate access layer and file purger to region (#2296)
Browse files Browse the repository at this point in the history
* feat: alias SchedulerRef and clean scheduler on drop

* feat: add scheduler to workers

* feat: use access layer to read write sst

* feat: add purger to region

* refactor: allow getting region_dir from AccessLayer

* feat: add scheduler to FlushScheduler

* feat: getter for object store

* chore: fix typo

Co-authored-by: Ruihang Xia <[email protected]>

---------

Co-authored-by: Ruihang Xia <[email protected]>
  • Loading branch information
evenyag and waynexia committed Sep 12, 2023
1 parent 46d171d commit 365e557
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 150 deletions.
50 changes: 41 additions & 9 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,76 @@ use std::sync::Arc;

use object_store::{util, ObjectStore};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::error::{DeleteSstSnafu, Result};
use crate::sst::file::FileId;
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;

pub type AccessLayerRef = Arc<AccessLayer>;

/// Sst access layer.
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
sst_dir: String,
region_dir: String,
object_store: ObjectStore,
}

impl std::fmt::Debug for AccessLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AccessLayer")
.field("sst_dir", &self.sst_dir)
.field("region_dir", &self.region_dir)
.finish()
}
}

impl AccessLayer {
pub fn new(sst_dir: &str, object_store: ObjectStore) -> AccessLayer {
/// Returns a new [AccessLayer] for specific `region_dir`.
pub fn new(region_dir: impl Into<String>, object_store: ObjectStore) -> AccessLayer {
AccessLayer {
sst_dir: sst_dir.to_string(),
region_dir: region_dir.into(),
object_store,
}
}

fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.sst_dir, file_name)
/// Returns the directory of the region.
pub fn region_dir(&self) -> &str {
&self.region_dir
}

/// Returns the object store of the layer.
pub fn object_store(&self) -> &ObjectStore {
&self.object_store
}

/// Deletes a SST file with given file id.
pub async fn delete_sst(&self, file_id: FileId) -> Result<()> {
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
self.object_store
.delete(&path)
.await
.context(DeleteSstSnafu { file_id })
}

/// Returns a reader builder for specific `file`.
pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
}

/// Returns a new parquet writer to write the SST for specific `file_id`.
pub(crate) fn write_sst(
&self,
file_id: FileId,
metadata: RegionMetadataRef,
source: Source,
) -> ParquetWriter {
let path = self.sst_file_path(&file_id.as_parquet());
ParquetWriter::new(path, metadata, source, self.object_store.clone())
}

/// Returns the `file_path` for the `file_name` in the object store.
fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.region_dir, file_name)
}
}
12 changes: 12 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use common_telemetry::warn;

/// Default region worker num.
const DEFAULT_NUM_WORKERS: usize = 1;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
/// Default region write buffer size.
pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32);

Expand All @@ -40,6 +42,10 @@ pub struct MitoConfig {
pub manifest_checkpoint_distance: u64,
/// Manifest compression type (default uncompressed).
pub manifest_compress_type: CompressionType,

// Background job configs:
/// Max number of running background jobs.
pub max_background_jobs: usize,
}

impl Default for MitoConfig {
Expand All @@ -50,6 +56,7 @@ impl Default for MitoConfig {
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
manifest_compress_type: CompressionType::Uncompressed,
max_background_jobs: DEFAULT_MAX_BG_JOB,
}
}
}
Expand All @@ -75,5 +82,10 @@ impl MitoConfig {
warn!("Sanitize channel size 0 to 1");
self.worker_channel_size = 1;
}

if self.max_background_jobs == 0 {
warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
self.max_background_jobs = DEFAULT_MAX_BG_JOB;
}
}
}
17 changes: 2 additions & 15 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::flush::WriteBufferManagerImpl;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::WorkerRequest;
use crate::worker::WorkerGroup;
Expand Down Expand Up @@ -106,15 +105,8 @@ impl EngineInner {
log_store: Arc<S>,
object_store: ObjectStore,
) -> EngineInner {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl {});

EngineInner {
workers: WorkerGroup::start(
config,
log_store,
object_store.clone(),
write_buffer_manager,
),
workers: WorkerGroup::start(config, log_store, object_store.clone()),
object_store,
}
}
Expand Down Expand Up @@ -152,12 +144,7 @@ impl EngineInner {
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let scan_region = ScanRegion::new(
version,
region.region_dir.clone(),
self.object_store.clone(),
request,
);
let scan_region = ScanRegion::new(version, region.access_layer.clone(), request);

scan_region.scanner()
}
Expand Down
37 changes: 9 additions & 28 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Flush related utilities and structs.

use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::sync::Arc;

use store_api::storage::RegionId;
Expand All @@ -23,8 +23,7 @@ use tokio::sync::oneshot::Sender;
use crate::error::Result;
use crate::region::MitoRegionRef;
use crate::request::{SenderDdlRequest, SenderWriteRequest};

const FLUSH_JOB_LIMIT: usize = 4;
use crate::schedule::scheduler::SchedulerRef;

/// Global write buffer (memtable) manager.
///
Expand Down Expand Up @@ -119,27 +118,21 @@ impl RegionFlushTask {

/// Manages background flushes of a worker.
pub(crate) struct FlushScheduler {
/// Pending flush tasks.
queue: VecDeque<RegionFlushTask>,
/// Tracks regions need to flush.
region_status: HashMap<RegionId, FlushStatus>,
/// Number of running flush jobs.
num_flush_running: usize,
/// Max number of background flush jobs.
job_limit: usize,
/// Background job scheduler.
scheduler: SchedulerRef,
}

impl Default for FlushScheduler {
fn default() -> Self {
impl FlushScheduler {
/// Creates a new flush scheduler.
pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
FlushScheduler {
queue: VecDeque::new(),
region_status: HashMap::new(),
num_flush_running: 0,
job_limit: FLUSH_JOB_LIMIT,
scheduler,
}
}
}

impl FlushScheduler {
/// Returns true if the region is stalling.
pub(crate) fn is_stalling(&self, region_id: RegionId) -> bool {
if let Some(status) = self.region_status.get(&region_id) {
Expand Down Expand Up @@ -170,21 +163,9 @@ impl FlushScheduler {
if flush_status.flushing_task.is_some() {
// There is already a flush job running.
flush_status.stalling = true;
self.queue.push_back(task);
return;
}

// Checks flush job limit.
debug_assert!(self.num_flush_running <= self.job_limit);
if !self.queue.is_empty() || self.num_flush_running >= self.job_limit {
debug_assert!(self.num_flush_running == self.job_limit);
// We reach job limit.
self.queue.push_back(task);
return;
}

// TODO(yingwen): Submit the flush job to job scheduler.

todo!()
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod request;
#[allow(dead_code)]
mod row_converter;
#[allow(dead_code)]
mod schedule;
pub(crate) mod schedule;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
Expand Down
16 changes: 6 additions & 10 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use common_time::range::TimestampRange;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};

use crate::access_layer::AccessLayerRef;
use crate::error::{BuildPredicateSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
Expand Down Expand Up @@ -85,10 +85,8 @@ impl Scanner {
pub(crate) struct ScanRegion {
/// Version of the region at scan.
version: VersionRef,
/// Directory of SST files.
file_dir: String,
/// Object store that stores SST files.
object_store: ObjectStore,
/// Access layer of the region.
access_layer: AccessLayerRef,
/// Scan request.
request: ScanRequest,
}
Expand All @@ -97,14 +95,12 @@ impl ScanRegion {
/// Creates a [ScanRegion].
pub(crate) fn new(
version: VersionRef,
file_dir: String,
object_store: ObjectStore,
access_layer: AccessLayerRef,
request: ScanRequest,
) -> ScanRegion {
ScanRegion {
version,
file_dir,
object_store,
access_layer,
request,
}
}
Expand Down Expand Up @@ -152,7 +148,7 @@ impl ScanRegion {
None => ProjectionMapper::all(&self.version.metadata)?,
};

let seq_scan = SeqScan::new(self.file_dir, self.object_store, mapper, self.request)
let seq_scan = SeqScan::new(self.access_layer.clone(), mapper, self.request)
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
.with_memtables(memtables)
Expand Down
33 changes: 13 additions & 20 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,24 @@ use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_time::range::TimestampRange;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::Predicate;

use crate::access_layer::AccessLayerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::read::merge::MergeReaderBuilder;
use crate::read::projection::ProjectionMapper;
use crate::read::BatchReader;
use crate::sst::file::FileHandle;
use crate::sst::parquet::reader::ParquetReaderBuilder;

/// Scans a region and returns rows in a sorted sequence.
///
/// The output order is always `order by primary key, time index`.
pub struct SeqScan {
/// Directory of SST files.
file_dir: String,
/// Object store that stores SST files.
object_store: ObjectStore,
/// Region SST access layer.
access_layer: AccessLayerRef,
/// Maps projected Batches to RecordBatches.
mapper: Arc<ProjectionMapper>,
/// Original scan request to scan memtable.
Expand All @@ -62,14 +59,12 @@ impl SeqScan {
/// Creates a new [SeqScan].
#[must_use]
pub(crate) fn new(
file_dir: String,
object_store: ObjectStore,
access_layer: AccessLayerRef,
mapper: ProjectionMapper,
request: ScanRequest,
) -> SeqScan {
SeqScan {
file_dir,
object_store,
access_layer,
mapper: Arc::new(mapper),
time_range: None,
predicate: None,
Expand Down Expand Up @@ -116,16 +111,14 @@ impl SeqScan {
builder.push_batch_iter(iter);
}
for file in &self.files {
let reader = ParquetReaderBuilder::new(
self.file_dir.clone(),
file.clone(),
self.object_store.clone(),
)
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.build()
.await?;
let reader = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.build()
.await?;
builder.push_batch_reader(Box::new(reader));
}
let mut reader = builder.build().await?;
Expand Down
Loading

0 comments on commit 365e557

Please sign in to comment.