From 6d706fb358b34171503eaadc152790721bb21c1d Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 7 Aug 2023 14:43:10 +0800 Subject: [PATCH 01/10] feat: hold wal entry in RegionWriteCtx --- src/mito2/src/worker/handle_write.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index cc6b599ef6cf..104067fad2b7 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -16,7 +16,8 @@ use std::collections::{hash_map, HashMap}; -use greptime_proto::v1::mito::Mutation; +use greptime_proto::v1::mito::{Mutation, WalEntry}; +use store_api::storage::RegionId; use tokio::sync::oneshot::Sender; use crate::error::{RegionNotFoundSnafu, Result}; @@ -33,6 +34,16 @@ impl RegionWorkerLoop { return; } + let region_ctxs = self.prepare_region_write_ctx(write_requests); + + todo!() + } + + /// Validates and groups requests by region. + fn prepare_region_write_ctx( + &self, + write_requests: Vec, + ) -> HashMap { let mut region_ctxs = HashMap::new(); for sender_req in write_requests { let region_id = sender_req.request.region_id; @@ -66,7 +77,7 @@ impl RegionWorkerLoop { region_ctx.push_sender_request(sender_req); } - todo!() + region_ctxs } } @@ -84,8 +95,8 @@ struct RegionWriteCtx { region: MitoRegionRef, /// Version of the region while creating the context. version: VersionRef, - /// Valid mutations. - mutations: Vec, + /// Valid WAL entry to write. + wal_entry: WalEntry, /// Result senders. /// /// The sender is 1:1 map to the mutation in `mutations`. @@ -99,14 +110,14 @@ impl RegionWriteCtx { RegionWriteCtx { region, version, - mutations: Vec::new(), + wal_entry: WalEntry::default(), senders: Vec::new(), } } /// Push [SenderWriteRequest] to the context. fn push_sender_request(&mut self, sender_req: SenderWriteRequest) { - self.mutations.push(Mutation { + self.wal_entry.mutations.push(Mutation { op_type: to_proto_op_type(sender_req.request.op_type) as i32, sequence: 0, // TODO(yingwen): Set sequence. rows: Some(sender_req.request.rows), From b59f1f6b60a24ed3373fcc5accfa92eb49dd6c4f Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 7 Aug 2023 15:51:53 +0800 Subject: [PATCH 02/10] feat: entry id and commited sequence --- src/mito2/src/region/version.rs | 35 +++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 54fe29df3ca8..be5ef2c4153d 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -23,38 +23,57 @@ //! Reason: data may be flushed/compacted and some data with old sequence may be removed //! and became invisible between step 1 and 2, so need to acquire version at first. -use std::sync::Arc; +use std::sync::{Arc, RwLock}; -use arc_swap::ArcSwap; use store_api::storage::SequenceNumber; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; use crate::memtable::MemtableRef; use crate::metadata::RegionMetadataRef; use crate::sst::version::{SstVersion, SstVersionRef}; +use crate::wal::EntryId; -/// Controls version of in memory metadata for a region. +/// Controls metadata and sequence numbers for a region. +/// +/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata +/// will generate a new [Version]. #[derive(Debug)] pub(crate) struct VersionControl { /// Latest version. - version: ArcSwap, + version: VersionRef, + /// Sequence number of last committed data. + committed_sequence: SequenceNumber, + /// Last WAL entry Id. + last_entry_id: EntryId, } impl VersionControl { /// Returns a new [VersionControl] with specific `version`. pub(crate) fn new(version: Version) -> VersionControl { VersionControl { - version: ArcSwap::new(Arc::new(version)), + version: Arc::new(version), + committed_sequence: 0, + last_entry_id: 0, } } /// Returns current [Version]. - pub(crate) fn current(&self) -> VersionRef { - self.version.load_full() + pub(crate) fn version(&self) -> VersionRef { + self.version.clone() + } + + /// Returns last committed sequence. + pub(crate) fn committed_sequence(&self) -> SequenceNumber { + self.committed_sequence + } + + /// Returns last entry id. + pub(crate) fn last_entry_id(&self) -> EntryId { + self.last_entry_id } } -pub(crate) type VersionControlRef = Arc; +pub(crate) type VersionControlRef = Arc>; /// Static metadata of a region. #[derive(Clone, Debug)] From e6569986c28da4152a8d8c79abbfc99675ee31a6 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 7 Aug 2023 19:34:17 +0800 Subject: [PATCH 03/10] feat: write to wal --- src/mito2/src/error.rs | 6 ++ src/mito2/src/region.rs | 9 +-- src/mito2/src/region/version.rs | 42 ++++++------ src/mito2/src/worker.rs | 4 +- src/mito2/src/worker/handle_write.rs | 96 +++++++++++++++++++++++++--- 5 files changed, 117 insertions(+), 40 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e5778a9c2bfa..fc99ff96b910 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use common_datasource::compression::CompressionType; use common_error::ext::{BoxedError, ErrorExt}; @@ -260,6 +261,10 @@ pub enum Error { location: Location, source: BoxedError, }, + + // Shared error for each writer in the write group. + #[snafu(display("Failed to write region, source: {}", source))] + WriteGroup { source: Arc }, } pub type Result = std::result::Result; @@ -296,6 +301,7 @@ impl ErrorExt for Error { | EncodeWal { .. } | DecodeWal { .. } => StatusCode::Internal, WriteBuffer { source, .. } => source.status_code(), + WriteGroup { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 2be5ddf2478c..d200ce2a8bed 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; -use crate::region::version::{VersionControlRef, VersionRef}; +use crate::region::version::VersionControlRef; /// Type to store region version. pub type VersionNumber = u32; @@ -40,7 +40,7 @@ pub(crate) struct MitoRegion { pub(crate) region_id: RegionId, /// Version controller for this region. - version_control: VersionControlRef, + pub(crate) version_control: VersionControlRef, /// Manager to maintain manifest for this region. manifest_manager: RegionManifestManager, } @@ -56,11 +56,6 @@ impl MitoRegion { Ok(()) } - - /// Returns current version of the region. - pub(crate) fn version(&self) -> VersionRef { - self.version_control.current() - } } /// Regions indexed by ids. diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index be5ef2c4153d..7e1f61f7476e 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -39,42 +39,40 @@ use crate::wal::EntryId; /// will generate a new [Version]. #[derive(Debug)] pub(crate) struct VersionControl { - /// Latest version. - version: VersionRef, - /// Sequence number of last committed data. - committed_sequence: SequenceNumber, - /// Last WAL entry Id. - last_entry_id: EntryId, + data: RwLock, } impl VersionControl { /// Returns a new [VersionControl] with specific `version`. pub(crate) fn new(version: Version) -> VersionControl { VersionControl { - version: Arc::new(version), - committed_sequence: 0, - last_entry_id: 0, + data: RwLock::new(VersionControlData { + version: Arc::new(version), + committed_sequence: 0, + last_entry_id: 0, + }), } } - /// Returns current [Version]. - pub(crate) fn version(&self) -> VersionRef { - self.version.clone() + /// Returns current copy of data. + pub(crate) fn current(&self) -> VersionControlData { + self.data.read().unwrap().clone() } +} - /// Returns last committed sequence. - pub(crate) fn committed_sequence(&self) -> SequenceNumber { - self.committed_sequence - } +pub(crate) type VersionControlRef = Arc; - /// Returns last entry id. - pub(crate) fn last_entry_id(&self) -> EntryId { - self.last_entry_id - } +/// Data of [VersionControl]. +#[derive(Debug, Clone)] +pub(crate) struct VersionControlData { + /// Latest version. + pub(crate) version: VersionRef, + /// Sequence number of last committed data. + pub(crate) committed_sequence: SequenceNumber, + /// Last WAL entry Id. + pub(crate) last_entry_id: EntryId, } -pub(crate) type VersionControlRef = Arc>; - /// Static metadata of a region. #[derive(Clone, Debug)] pub(crate) struct Version { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2ba836666006..5d94e4df5cf7 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -285,7 +285,7 @@ struct RegionWorkerLoop { memtable_builder: MemtableBuilderRef, } -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Starts the worker loop. async fn run(&mut self) { info!("Start region worker thread {}", self.id); @@ -353,7 +353,9 @@ impl RegionWorkerLoop { self.handle_ddl_requests(ddl_requests).await; } +} +impl RegionWorkerLoop { /// Takes and handles all ddl requests. async fn handle_ddl_requests(&mut self, ddl_requests: Vec) { if ddl_requests.is_empty() { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 104067fad2b7..ccdc058efee0 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -15,30 +15,52 @@ //! Handling write requests. use std::collections::{hash_map, HashMap}; +use std::mem; +use std::sync::Arc; use greptime_proto::v1::mito::{Mutation, WalEntry}; -use store_api::storage::RegionId; +use snafu::ResultExt; +use store_api::logstore::LogStore; +use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::Sender; -use crate::error::{RegionNotFoundSnafu, Result}; +use crate::error::{Error, RegionNotFoundSnafu, Result, WriteGroupSnafu}; use crate::proto_util::to_proto_op_type; -use crate::region::version::VersionRef; +use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; use crate::request::SenderWriteRequest; +use crate::wal::{EntryId, WalWriter}; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Takes and handles all write requests. pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec) { if write_requests.is_empty() { return; } - let region_ctxs = self.prepare_region_write_ctx(write_requests); + let mut region_ctxs = self.prepare_region_write_ctx(write_requests); + + // Write WAL. + let mut wal_writer = self.wal.writer(); + for region_ctx in region_ctxs.values_mut() { + if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) { + region_ctx.set_error(e); + } + } + if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { + // Failed to write wal. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.set_error(e.clone()); + } + return; + } todo!() } +} +impl RegionWorkerLoop { /// Validates and groups requests by region. fn prepare_region_write_ctx( &self, @@ -95,33 +117,87 @@ struct RegionWriteCtx { region: MitoRegionRef, /// Version of the region while creating the context. version: VersionRef, + /// Next sequence number to write. + /// + /// The context assigns a unique sequence number for each row. + next_sequence: SequenceNumber, + /// Next entry id of WAL to write. + next_entry_id: EntryId, /// Valid WAL entry to write. wal_entry: WalEntry, + /// Error during writing this region. + err: Option>, /// Result senders. /// - /// The sender is 1:1 map to the mutation in `mutations`. - senders: Vec>>>, + /// All senders will receive the same result. + senders: Vec>>, } impl RegionWriteCtx { /// Returns an empty context. fn new(region: MitoRegionRef) -> RegionWriteCtx { - let version = region.version(); + let VersionControlData { + version, + committed_sequence, + last_entry_id, + } = region.version_control.current(); RegionWriteCtx { region, version, + next_sequence: committed_sequence + 1, + next_entry_id: last_entry_id + 1, wal_entry: WalEntry::default(), + err: None, senders: Vec::new(), } } /// Push [SenderWriteRequest] to the context. fn push_sender_request(&mut self, sender_req: SenderWriteRequest) { + let num_rows = sender_req.request.rows.rows.len() as u64; + self.wal_entry.mutations.push(Mutation { op_type: to_proto_op_type(sender_req.request.op_type) as i32, - sequence: 0, // TODO(yingwen): Set sequence. + sequence: self.next_sequence, rows: Some(sender_req.request.rows), }); - self.senders.push(sender_req.sender); + if let Some(sender) = sender_req.sender { + self.senders.push(sender); + } + + // Increase sequence number. + self.next_sequence += num_rows; + } + + /// Encode and add WAL entry to the writer. + fn add_wal_entry(&self, wal_writer: &mut WalWriter) -> Result<()> { + wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry) + } + + /// Sets error and marks the write operation is failed. + /// + /// The context will send the error to waiters on drop. + fn set_error(&mut self, err: Arc) { + self.err = Some(err); + } + + /// Sends result to waiters. + fn notify_result(&mut self) { + let senders = mem::take(&mut self.senders); + for sender in senders { + if let Some(err) = &self.err { + // Try to send the error to waiters. + let _ = sender.send(Err(err.clone()).context(WriteGroupSnafu)); + } else { + // Send success result. + let _ = sender.send(Ok(())); + } + } + } +} + +impl Drop for RegionWriteCtx { + fn drop(&mut self) { + self.notify_result(); } } From ba4eb25d982679468ea5005fd5b338ae1e5cd723 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 9 Aug 2023 14:11:38 +0800 Subject: [PATCH 04/10] feat: write memtable --- src/mito2/src/memtable.rs | 2 +- src/mito2/src/memtable/version.rs | 5 ++ src/mito2/src/worker/handle_write.rs | 101 ++++++++++++++++++--------- 3 files changed, 75 insertions(+), 33 deletions(-) diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index e23a8daa8cd3..8b830a233b98 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use crate::error::Result; -use crate::memtable::key_values::KeyValues; +pub use crate::memtable::key_values::KeyValues; use crate::metadata::RegionMetadataRef; /// Id for memtables. diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 0fdc6d07c674..f769da498b53 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -37,4 +37,9 @@ impl MemtableVersion { immutables: vec![], } } + + /// Returns the mutable memtable. + pub(crate) fn mutable(&self) -> &MemtableRef { + &self.mutable + } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index ccdc058efee0..19672952f01c 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -25,6 +25,7 @@ use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::Sender; use crate::error::{Error, RegionNotFoundSnafu, Result, WriteGroupSnafu}; +use crate::memtable::KeyValues; use crate::proto_util::to_proto_op_type; use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; @@ -41,7 +42,7 @@ impl RegionWorkerLoop { let mut region_ctxs = self.prepare_region_write_ctx(write_requests); - // Write WAL. + // Write to WAL. let mut wal_writer = self.wal.writer(); for region_ctx in region_ctxs.values_mut() { if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) { @@ -56,7 +57,10 @@ impl RegionWorkerLoop { return; } - todo!() + // Write to memtables. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.write_memtable(); + } } } @@ -111,6 +115,41 @@ fn send_result(sender: Option>>, res: Result<()>) { } } +/// Notifier to notify write result on drop. +struct WriteNotify { + /// Error to send to the waiter. + err: Option>, + /// Sender to send write result to the waiter for this mutation. + sender: Option>>, +} + +impl WriteNotify { + /// Creates a new notify from the `sender`. + fn new(sender: Option>>) -> WriteNotify { + WriteNotify { err: None, sender } + } + + /// Send result to the waiter. + fn notify_result(&mut self) { + let Some(sender) = self.sender.take() else { + return; + }; + if let Some(err) = &self.err { + // Try to send the error to waiters. + let _ = sender.send(Err(err.clone()).context(WriteGroupSnafu)); + } else { + // Send success result. + let _ = sender.send(Ok(())); + } + } +} + +impl Drop for WriteNotify { + fn drop(&mut self) { + self.notify_result(); + } +} + /// Context to keep region metadata and buffer write requests. struct RegionWriteCtx { /// Region to write. @@ -124,13 +163,14 @@ struct RegionWriteCtx { /// Next entry id of WAL to write. next_entry_id: EntryId, /// Valid WAL entry to write. + /// + /// We keep [WalEntry] instead of mutations to avoid taking mutations + /// out of the context to construct the wal entry when we write to the wal. wal_entry: WalEntry, - /// Error during writing this region. - err: Option>, - /// Result senders. + /// Notifiers to send write results to waiters. /// - /// All senders will receive the same result. - senders: Vec>>, + /// The i-th notify is for i-th mutation. + notifiers: Vec, } impl RegionWriteCtx { @@ -147,8 +187,7 @@ impl RegionWriteCtx { next_sequence: committed_sequence + 1, next_entry_id: last_entry_id + 1, wal_entry: WalEntry::default(), - err: None, - senders: Vec::new(), + notifiers: Vec::new(), } } @@ -161,9 +200,8 @@ impl RegionWriteCtx { sequence: self.next_sequence, rows: Some(sender_req.request.rows), }); - if let Some(sender) = sender_req.sender { - self.senders.push(sender); - } + // Notifiers are 1:1 map to mutations. + self.notifiers.push(WriteNotify::new(sender_req.sender)); // Increase sequence number. self.next_sequence += num_rows; @@ -174,30 +212,29 @@ impl RegionWriteCtx { wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry) } - /// Sets error and marks the write operation is failed. - /// - /// The context will send the error to waiters on drop. + /// Sets error and marks all write operations are failed. fn set_error(&mut self, err: Arc) { - self.err = Some(err); + // Set error for all notifiers + for notify in &mut self.notifiers { + notify.err = Some(err.clone()); + } } - /// Sends result to waiters. - fn notify_result(&mut self) { - let senders = mem::take(&mut self.senders); - for sender in senders { - if let Some(err) = &self.err { - // Try to send the error to waiters. - let _ = sender.send(Err(err.clone()).context(WriteGroupSnafu)); - } else { - // Send success result. - let _ = sender.send(Ok(())); + /// Consumes mutations and writes them into mutable memtable. + fn write_memtable(&mut self) { + debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); + + let mutable = self.version.memtables.mutable(); + // Takes mutations from the wal entry. + let mutations = mem::take(&mut self.wal_entry.mutations); + for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { + // Write mutation to the memtable. + let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { + continue; + }; + if let Err(e) = mutable.write(&kvs) { + notify.err = Some(Arc::new(e)); } } } } - -impl Drop for RegionWriteCtx { - fn drop(&mut self) { - self.notify_result(); - } -} From 3a8439200394275fc6286e4d4483010f926b0ba3 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 9 Aug 2023 19:51:22 +0800 Subject: [PATCH 05/10] feat: fill missing columns --- src/mito2/src/engine.rs | 19 +++++++++++++------ src/mito2/src/error.rs | 16 +++++++++------- src/mito2/src/region.rs | 7 +++++++ src/mito2/src/request.rs | 6 +----- src/mito2/src/worker.rs | 14 +++++++++++++- src/mito2/src/worker/handle_write.rs | 26 +++++++++++++++++++++----- 6 files changed, 64 insertions(+), 24 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index f772795afa53..87a70dd97709 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -20,12 +20,12 @@ mod tests; use std::sync::Arc; use object_store::ObjectStore; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::{RecvSnafu, Result}; +use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result}; use crate::request::{ CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest, }; @@ -90,12 +90,19 @@ impl MitoEngine { } /// Write to a region. - pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { + pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> { write_request.validate()?; - // TODO(yingwen): Fill default values. - // We need to fill default values before writing it to WAL so we can get - // the same default value after reopening the region. + let region = self + .inner + .workers + .get_region(write_request.region_id) + .context(RegionNotFoundSnafu { + region_id: write_request.region_id, + })?; + let metadata = region.metadata(); + + write_request.fill_missing_columns(&metadata)?; self.inner .handle_request_body(RequestBody::Write(write_request)) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index fc99ff96b910..df8aa441a117 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -186,15 +186,10 @@ pub enum Error { /// An error type to indicate that schema is changed and we need /// to fill default values again. - #[snafu(display( - "Need to fill default value to column {} of region {}", - column, - region_id - ))] + #[snafu(display("Need to fill default value for region {}", region_id))] FillDefault { region_id: RegionId, - column: String, - // The error is for retry purpose so we don't need a location. + // The error is for internal use so we don't need a location. }, #[snafu(display( @@ -269,6 +264,13 @@ pub enum Error { pub type Result = std::result::Result; +impl Error { + /// Returns true if we need to fill default value for a region. + pub(crate) fn is_fill_default(&self) -> bool { + matches!(self, Error::FillDefault { .. }) + } +} + impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index d200ce2a8bed..b57bcbfd1d1d 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -25,6 +25,7 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; +use crate::metadata::RegionMetadataRef; use crate::region::version::VersionControlRef; /// Type to store region version. @@ -56,6 +57,12 @@ impl MitoRegion { Ok(()) } + + /// Returns current metadata of the region. + pub(crate) fn metadata(&self) -> RegionMetadataRef { + let version_data = self.version_control.current(); + version_data.version.metadata.clone() + } } /// Regions indexed by ids. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 7a303027909e..4b35a04aadc9 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -190,11 +190,7 @@ impl WriteRequest { } ); - return FillDefaultSnafu { - region_id, - column: &column.column_schema.name, - } - .fail(); + return FillDefaultSnafu { region_id }.fail(); } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 5d94e4df5cf7..a26d657a1652 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -37,7 +37,7 @@ use tokio::sync::{mpsc, Mutex}; use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; -use crate::region::{RegionMap, RegionMapRef}; +use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest}; use crate::wal::Wal; @@ -133,6 +133,13 @@ impl WorkerGroup { self.worker(region_id).is_region_exists(region_id) } + /// Returns region of specific `region_id`. + /// + /// This method should not be public. + pub(crate) fn get_region(&self, region_id: RegionId) -> Option { + self.worker(region_id).get_region(region_id) + } + /// Get worker for specific `region_id`. fn worker(&self, region_id: RegionId) -> &RegionWorker { let mut hasher = DefaultHasher::new(); @@ -252,6 +259,11 @@ impl RegionWorker { fn is_region_exists(&self, region_id: RegionId) -> bool { self.regions.is_region_exists(region_id) } + + /// Returns region of specific `region_id`. + fn get_region(&self, region_id: RegionId) -> Option { + self.regions.get_region(region_id) + } } impl Drop for RegionWorker { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 19672952f01c..8ddad90c11c6 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -26,10 +26,11 @@ use tokio::sync::oneshot::Sender; use crate::error::{Error, RegionNotFoundSnafu, Result, WriteGroupSnafu}; use crate::memtable::KeyValues; +use crate::metadata::RegionMetadata; use crate::proto_util::to_proto_op_type; use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; -use crate::request::SenderWriteRequest; +use crate::request::{SenderWriteRequest, WriteRequest}; use crate::wal::{EntryId, WalWriter}; use crate::worker::RegionWorkerLoop; @@ -71,7 +72,7 @@ impl RegionWorkerLoop { write_requests: Vec, ) -> HashMap { let mut region_ctxs = HashMap::new(); - for sender_req in write_requests { + for mut sender_req in write_requests { let region_id = sender_req.request.region_id; // Checks whether the region exists. if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { @@ -90,9 +91,8 @@ impl RegionWorkerLoop { let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); // Checks whether request schema is compatible with region schema. - if let Err(e) = sender_req - .request - .check_schema(®ion_ctx.version.metadata) + if let Err(e) = + maybe_fill_missing_columns(&mut sender_req.request, ®ion_ctx.version.metadata) { send_result(sender_req.sender, Err(e)); @@ -107,6 +107,22 @@ impl RegionWorkerLoop { } } +/// Checks the schema and fill missing columns. +fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetadata) -> Result<()> { + if let Err(e) = request.check_schema(metadata) { + if e.is_fill_default() { + // TODO(yingwen): Add metrics for this case. + // We need to fill default value again. The write request may be a request + // sent before changing the schema. + request.fill_missing_columns(metadata)?; + } else { + return Err(e); + } + } + + Ok(()) +} + /// Send result to the request. fn send_result(sender: Option>>, res: Result<()>) { if let Some(sender) = sender { From 84ac1cb9a6ce8a97adf89f8533ae8d1a71625ebc Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 9 Aug 2023 21:41:22 +0800 Subject: [PATCH 06/10] feat: validate write request --- src/mito2/src/proto_util.rs | 35 ++++++++++++++++ src/mito2/src/request.rs | 83 +++++++++++++++++++++++++++++-------- 2 files changed, 100 insertions(+), 18 deletions(-) diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs index 6884dff3604d..1f8f6f0add68 100644 --- a/src/mito2/src/proto_util.rs +++ b/src/mito2/src/proto_util.rs @@ -120,6 +120,39 @@ pub(crate) fn to_proto_value(value: Value) -> Option { Some(proto_value) } +/// Returns the [ColumnDataType] of the value. +/// +/// If value is null, returns `None`. +pub(crate) fn proto_value_type(value: &v1::Value) -> Option { + let value_data = value.value.as_ref()?; + let value_type = match value_data { + v1::value::Value::I8Value(_) => ColumnDataType::Int8, + v1::value::Value::I16Value(_) => ColumnDataType::Int16, + v1::value::Value::I32Value(_) => ColumnDataType::Int32, + v1::value::Value::I64Value(_) => ColumnDataType::Int64, + v1::value::Value::U8Value(_) => ColumnDataType::Uint8, + v1::value::Value::U16Value(_) => ColumnDataType::Uint16, + v1::value::Value::U32Value(_) => ColumnDataType::Uint32, + v1::value::Value::U64Value(_) => ColumnDataType::Uint64, + v1::value::Value::F32Value(_) => ColumnDataType::Float32, + v1::value::Value::F64Value(_) => ColumnDataType::Float64, + v1::value::Value::BoolValue(_) => ColumnDataType::Boolean, + v1::value::Value::BinaryValue(_) => ColumnDataType::Binary, + v1::value::Value::StringValue(_) => ColumnDataType::String, + v1::value::Value::DateValue(_) => ColumnDataType::Date, + v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime, + v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond, + v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond, + v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, + v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, + v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, + }; + Some(value_type) +} + /// Convert [ConcreteDataType] to [ColumnDataType]. pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { let column_data_type = match data_type { @@ -186,3 +219,5 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType false } } + +// TODO(yingwen): Tests. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4b35a04aadc9..f6bcacd4bb3f 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::time::Duration; use common_base::readable_size::ReadableSize; -use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows}; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, Value}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -27,8 +27,8 @@ use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result}; use crate::metadata::{ColumnMetadata, RegionMetadata}; use crate::proto_util::{ - is_column_type_value_eq, is_semantic_type_eq, to_column_data_type, to_proto_semantic_type, - to_proto_value, + is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, + to_proto_semantic_type, to_proto_value, }; /// Options that affect the entire region. @@ -104,28 +104,51 @@ pub struct WriteRequest { impl WriteRequest { /// Returns a new request. - pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest { - let name_to_index = rows - .schema - .iter() - .enumerate() - .map(|(index, column)| (column.column_name.clone(), index)) - .collect(); - WriteRequest { + pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result { + let mut name_to_index = HashMap::with_capacity(rows.schema.len()); + for (index, column) in rows.schema.iter().enumerate() { + ensure!( + name_to_index + .insert(column.column_name.clone(), index) + .is_none(), + InvalidRequestSnafu { + region_id, + reason: format!("duplicate column {}", column.column_name), + } + ); + } + + Ok(WriteRequest { region_id, op_type, rows, name_to_index, - } + }) } - /// Validate the request. + /// Validates the request. + /// + /// Ensures rows match the schema. pub(crate) fn validate(&self) -> Result<()> { - // - checks whether the request is too large. - // - checks whether each row in rows has the same schema. - // - checks whether each column match the schema in Rows. - // - checks rows don't have duplicate columns. - unimplemented!() + for row in &self.rows.rows { + ensure!( + row.values.len() == self.rows.schema.len(), + InvalidRequestSnafu { + region_id: self.region_id, + reason: format!( + "row has {} columns but schema has {}", + row.values.len(), + self.rows.schema.len() + ), + } + ); + + for (value, column_schema) in row.values.iter().zip(&self.rows.schema) { + validate_proto_value(self.region_id, value, column_schema)?; + } + } + + Ok(()) } /// Get column index by name. @@ -274,6 +297,30 @@ impl WriteRequest { } } +/// Validate proto value schema. +pub(crate) fn validate_proto_value( + region_id: RegionId, + value: &Value, + column_schema: &ColumnSchema, +) -> Result<()> { + if let Some(value_type) = proto_value_type(value) { + ensure!( + value_type as i32 == column_schema.datatype, + InvalidRequestSnafu { + region_id, + reason: format!( + "column {} has type {:?}, but schema has type {:?}", + column_schema.column_name, + value_type, + ColumnDataType::from_i32(column_schema.datatype) + ), + } + ); + } + + Ok(()) +} + /// Sender and write request. pub(crate) struct SenderWriteRequest { /// Result sender. From 1e791d93e1cd03b5c01511e257ed813766bff094 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 9 Aug 2023 23:00:08 +0800 Subject: [PATCH 07/10] feat: more validation to write request --- src/mito2/src/engine.rs | 2 - src/mito2/src/memtable/key_values.rs | 9 +- src/mito2/src/proto_util.rs | 17 ++ src/mito2/src/request.rs | 333 ++++++++++++++++++++++++++- 4 files changed, 343 insertions(+), 18 deletions(-) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 87a70dd97709..30e6deefd358 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -91,8 +91,6 @@ impl MitoEngine { /// Write to a region. pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> { - write_request.validate()?; - let region = self .inner .workers diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs index 4ea70c3eae76..dbd8e12b9157 100644 --- a/src/mito2/src/memtable/key_values.rs +++ b/src/mito2/src/memtable/key_values.rs @@ -190,11 +190,12 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use greptime_proto::v1; - use greptime_proto::v1::{value, ColumnDataType, Value}; + use greptime_proto::v1::ColumnDataType; use store_api::storage::RegionId; use super::*; use crate::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use crate::proto_util::i64_value; const TS_NAME: &str = "ts"; const START_SEQ: SequenceNumber = 100; @@ -290,12 +291,6 @@ mod tests { } } - fn i64_value(data: i64) -> Value { - Value { - value: Some(value::Value::I64Value(data)), - } - } - fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) { assert_eq!(num_rows, kvs.num_rows()); let mut expect_seq = START_SEQ; diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs index 1f8f6f0add68..d2de210c8330 100644 --- a/src/mito2/src/proto_util.rs +++ b/src/mito2/src/proto_util.rs @@ -153,6 +153,23 @@ pub(crate) fn proto_value_type(value: &v1::Value) -> Option { Some(value_type) } +// TODO(yingwen): Support conversion in greptime-proto. +/// Creates value for i64. +#[cfg(test)] +pub(crate) fn i64_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::I64Value(data)), + } +} + +/// Creates value for timestamp millis. +#[cfg(test)] +pub(crate) fn ts_ms_value(data: i64) -> v1::Value { + v1::Value { + value: Some(v1::value::Value::TsMillisecondValue(data)), + } +} + /// Convert [ConcreteDataType] to [ColumnDataType]. pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { let column_data_type = match data_type { diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index f6bcacd4bb3f..d70232bcc49d 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -100,6 +100,8 @@ pub struct WriteRequest { pub rows: Rows, /// Map column name to column index in `rows`. name_to_index: HashMap, + /// Whether each column has null. + has_null: Vec, } impl WriteRequest { @@ -118,18 +120,23 @@ impl WriteRequest { ); } - Ok(WriteRequest { + let has_null = vec![false; rows.schema.len()]; + let mut request = WriteRequest { region_id, op_type, rows, name_to_index, - }) + has_null, + }; + request.init()?; + + Ok(request) } - /// Validates the request. + /// Initailizes and validates the request. /// /// Ensures rows match the schema. - pub(crate) fn validate(&self) -> Result<()> { + fn init(&mut self) -> Result<()> { for row in &self.rows.rows { ensure!( row.values.len() == self.rows.schema.len(), @@ -143,8 +150,13 @@ impl WriteRequest { } ); - for (value, column_schema) in row.values.iter().zip(&self.rows.schema) { + for (i, (value, column_schema)) in row.values.iter().zip(&self.rows.schema).enumerate() + { validate_proto_value(self.region_id, value, column_schema)?; + + if value.value.is_none() { + self.has_null[i] = true; + } } } @@ -179,10 +191,12 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "column {} expect type {:?}, given: {:?}({})", + "column {} expect type {:?}, given: {}({})", column.column_schema.name, column.column_schema.data_type, - ColumnDataType::from_i32(input_col.datatype), + ColumnDataType::from_i32(input_col.datatype) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), input_col.datatype, ) } @@ -194,14 +208,27 @@ impl WriteRequest { InvalidRequestSnafu { region_id, reason: format!( - "column {} has semantic type {:?}, given: {:?}({})", + "column {} has semantic type {:?}, given: {}({})", column.column_schema.name, column.semantic_type, - greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), + greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), input_col.semantic_type ), } ); + + // Check nullable. + // Safety: `rows_columns` ensures this column exists. + let has_null = self.has_null[self.name_to_index[&column.column_schema.name]]; + ensure!( + !has_null || column.column_schema.is_nullable(), + InvalidRequestSnafu { + region_id, + reason: format!("column {} is not null", column.column_schema.name), + } + ); } else { // For columns not in rows, checks whether they have default value. ensure!( @@ -405,3 +432,291 @@ impl RequestBody { } } } + +#[cfg(test)] +mod tests { + use datatypes::prelude::ConcreteDataType; + use greptime_proto::v1::{Row, SemanticType}; + + use super::*; + use crate::error::Error; + use crate::metadata::RegionMetadataBuilder; + use crate::proto_util::{i64_value, ts_ms_value}; + + fn new_column_schema( + name: &str, + data_type: ColumnDataType, + semantic_type: SemanticType, + ) -> ColumnSchema { + ColumnSchema { + column_name: name.to_string(), + datatype: data_type as i32, + semantic_type: semantic_type as i32, + } + } + + fn check_invalid_request(err: &Error, expect: &str) { + if let Error::InvalidRequest { + region_id: _, + reason, + location: _, + } = err + { + assert_eq!(reason, expect); + } else { + panic!("Unexpected error {err}") + } + } + + #[test] + fn test_write_request_duplicate_column() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![], + }; + + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + check_invalid_request(&err, "duplicate column c0"); + } + + #[test] + fn test_valid_write_request() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2)], + }], + }; + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + assert_eq!(0, request.column_index_by_name("c0").unwrap()); + assert_eq!(1, request.column_index_by_name("c1").unwrap()); + assert_eq!(None, request.column_index_by_name("c2")); + } + + #[test] + fn test_write_request_column_num() { + let rows = Rows { + schema: vec![ + new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2), i64_value(3)], + }], + }; + + let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err(); + check_invalid_request(&err, "row has 3 columns but schema has 2"); + } + + fn new_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1), 1); + builder + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: crate::metadata::SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "k0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: crate::metadata::SemanticType::Tag, + column_id: 2, + }) + .primary_key(vec![2]); + builder.build().unwrap() + } + + #[test] + fn test_check_schema() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + request.check_schema(&metadata).unwrap(); + } + + #[test] + fn test_column_type() { + let rows = Rows { + schema: vec![ + new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![i64_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)"); + } + + #[test] + fn test_semantic_type() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Tag, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)"); + } + + #[test] + fn test_column_nullable() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![Value { value: None }, i64_value(2)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts is not null"); + } + + #[test] + fn test_column_default() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, "missing column ts"); + } + + #[test] + fn test_unknown_column() { + let rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), i64_value(2), i64_value(3)], + }], + }; + let metadata = new_region_metadata(); + + let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + check_invalid_request(&err, r#"unknown columns: ["k1"]"#); + } + + #[test] + fn test_fill_missing_columns() { + let rows = Rows { + schema: vec![new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + )], + rows: vec![Row { + values: vec![ts_ms_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.check_schema(&metadata).unwrap_err(); + assert!(err.is_fill_default()); + request.fill_missing_columns(&metadata).unwrap(); + + let expect_rows = Rows { + schema: vec![ + new_column_schema( + "ts", + ColumnDataType::TimestampMillisecond, + SemanticType::Timestamp, + ), + new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag), + ], + rows: vec![Row { + values: vec![ts_ms_value(1), Value { value: None }], + }], + }; + assert_eq!(expect_rows, request.rows); + } + + #[test] + fn test_no_default() { + let rows = Rows { + schema: vec![new_column_schema( + "k0", + ColumnDataType::Int64, + SemanticType::Tag, + )], + rows: vec![Row { + values: vec![i64_value(1)], + }], + }; + let metadata = new_region_metadata(); + + let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap(); + let err = request.fill_missing_columns(&metadata).unwrap_err(); + check_invalid_request(&err, "column ts does not have default value"); + } +} From 28e733362cd1fa7b0ada3964fbeaefa2d8ce1e07 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 9 Aug 2023 23:07:39 +0800 Subject: [PATCH 08/10] chore: fix typos --- src/mito2/src/request.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index d70232bcc49d..bed1c1782a3c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -133,7 +133,7 @@ impl WriteRequest { Ok(request) } - /// Initailizes and validates the request. + /// Initializes and validates the request. /// /// Ensures rows match the schema. fn init(&mut self) -> Result<()> { From 1e9f5bfcaa9f5dfdc0440e541aa7704111e74d8c Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 12 Aug 2023 13:59:28 +0800 Subject: [PATCH 09/10] feat: remove init and validate rows in new() --- src/mito2/src/request.rs | 47 ++++++++++++++++------------------------ 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index bed1c1782a3c..42b928d606ce 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -105,7 +105,9 @@ pub struct WriteRequest { } impl WriteRequest { - /// Returns a new request. + /// Creates a new request. + /// + /// Returns `Err` if `rows` are invalid. pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result { let mut name_to_index = HashMap::with_capacity(rows.schema.len()); for (index, column) in rows.schema.iter().enumerate() { @@ -120,47 +122,36 @@ impl WriteRequest { ); } - let has_null = vec![false; rows.schema.len()]; - let mut request = WriteRequest { - region_id, - op_type, - rows, - name_to_index, - has_null, - }; - request.init()?; - - Ok(request) - } - - /// Initializes and validates the request. - /// - /// Ensures rows match the schema. - fn init(&mut self) -> Result<()> { - for row in &self.rows.rows { + let mut has_null = vec![false; rows.schema.len()]; + for row in &rows.rows { ensure!( - row.values.len() == self.rows.schema.len(), + row.values.len() == rows.schema.len(), InvalidRequestSnafu { - region_id: self.region_id, + region_id: region_id, reason: format!( "row has {} columns but schema has {}", row.values.len(), - self.rows.schema.len() + rows.schema.len() ), } ); - for (i, (value, column_schema)) in row.values.iter().zip(&self.rows.schema).enumerate() - { - validate_proto_value(self.region_id, value, column_schema)?; + for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() { + validate_proto_value(region_id, value, column_schema)?; if value.value.is_none() { - self.has_null[i] = true; + has_null[i] = true; } } } - Ok(()) + Ok(WriteRequest { + region_id, + op_type, + rows, + name_to_index, + has_null, + }) } /// Get column index by name. @@ -168,7 +159,7 @@ impl WriteRequest { self.name_to_index.get(name).copied() } - /// Checks schema of rows. + /// Checks schema of rows is compatible with schema of the region. /// /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault) /// error. From 77feea8f1cd3a0ad74be1693ce73d3ce68bdf447 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 12 Aug 2023 14:10:30 +0800 Subject: [PATCH 10/10] style: fix clippy --- src/mito2/src/request.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 42b928d606ce..4e5b3fa7fb04 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -127,7 +127,7 @@ impl WriteRequest { ensure!( row.values.len() == rows.schema.len(), InvalidRequestSnafu { - region_id: region_id, + region_id, reason: format!( "row has {} columns but schema has {}", row.values.len(),