Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Write wal and memtable #2135

Merged
merged 10 commits into from
Aug 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ mod tests;
use std::sync::Arc;

use object_store::ObjectStore;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
Expand Down Expand Up @@ -90,12 +90,17 @@ impl MitoEngine {
}

/// Write to a region.
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;

// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.
pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> {
let region = self
.inner
.workers
.get_region(write_request.region_id)
.context(RegionNotFoundSnafu {
region_id: write_request.region_id,
})?;
let metadata = region.metadata();

write_request.fill_missing_columns(&metadata)?;

self.inner
.handle_request_body(RequestBody::Write(write_request))
Expand Down
22 changes: 15 additions & 7 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use common_datasource::compression::CompressionType;
use common_error::ext::{BoxedError, ErrorExt};
Expand Down Expand Up @@ -185,15 +186,10 @@ pub enum Error {

/// An error type to indicate that schema is changed and we need
/// to fill default values again.
#[snafu(display(
"Need to fill default value to column {} of region {}",
column,
region_id
))]
#[snafu(display("Need to fill default value for region {}", region_id))]
FillDefault {
region_id: RegionId,
column: String,
// The error is for retry purpose so we don't need a location.
// The error is for internal use so we don't need a location.
},

#[snafu(display(
Expand Down Expand Up @@ -260,10 +256,21 @@ pub enum Error {
location: Location,
source: BoxedError,
},

// Shared error for each writer in the write group.
#[snafu(display("Failed to write region, source: {}", source))]
WriteGroup { source: Arc<Error> },
}

pub type Result<T> = std::result::Result<T, Error>;

impl Error {
/// Returns true if we need to fill default value for a region.
pub(crate) fn is_fill_default(&self) -> bool {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
matches!(self, Error::FillDefault { .. })
}
}

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
Expand Down Expand Up @@ -296,6 +303,7 @@ impl ErrorExt for Error {
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use crate::error::Result;
use crate::memtable::key_values::KeyValues;
pub use crate::memtable::key_values::KeyValues;
use crate::metadata::RegionMetadataRef;

/// Id for memtables.
Expand Down
9 changes: 2 additions & 7 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use greptime_proto::v1;
use greptime_proto::v1::{value, ColumnDataType, Value};
use greptime_proto::v1::ColumnDataType;
use store_api::storage::RegionId;

use super::*;
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder};
use crate::proto_util::i64_value;

const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;
Expand Down Expand Up @@ -290,12 +291,6 @@ mod tests {
}
}

fn i64_value(data: i64) -> Value {
Value {
value: Some(value::Value::I64Value(data)),
}
}

fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) {
assert_eq!(num_rows, kvs.num_rows());
let mut expect_seq = START_SEQ;
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ impl MemtableVersion {
immutables: vec![],
}
}

/// Returns the mutable memtable.
pub(crate) fn mutable(&self) -> &MemtableRef {
&self.mutable
}
}
52 changes: 52 additions & 0 deletions src/mito2/src/proto_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,56 @@ pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
Some(proto_value)
}

/// Returns the [ColumnDataType] of the value.
///
/// If value is null, returns `None`.
pub(crate) fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
let value_data = value.value.as_ref()?;
let value_type = match value_data {
v1::value::Value::I8Value(_) => ColumnDataType::Int8,
v1::value::Value::I16Value(_) => ColumnDataType::Int16,
v1::value::Value::I32Value(_) => ColumnDataType::Int32,
v1::value::Value::I64Value(_) => ColumnDataType::Int64,
v1::value::Value::U8Value(_) => ColumnDataType::Uint8,
v1::value::Value::U16Value(_) => ColumnDataType::Uint16,
v1::value::Value::U32Value(_) => ColumnDataType::Uint32,
v1::value::Value::U64Value(_) => ColumnDataType::Uint64,
v1::value::Value::F32Value(_) => ColumnDataType::Float32,
v1::value::Value::F64Value(_) => ColumnDataType::Float64,
v1::value::Value::BoolValue(_) => ColumnDataType::Boolean,
v1::value::Value::BinaryValue(_) => ColumnDataType::Binary,
v1::value::Value::StringValue(_) => ColumnDataType::String,
v1::value::Value::DateValue(_) => ColumnDataType::Date,
v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime,
v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond,
v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond,
v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond,
};
Some(value_type)
}

// TODO(yingwen): Support conversion in greptime-proto.
/// Creates value for i64.
#[cfg(test)]
pub(crate) fn i64_value(data: i64) -> v1::Value {
v1::Value {
value: Some(v1::value::Value::I64Value(data)),
}
}

/// Creates value for timestamp millis.
#[cfg(test)]
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
v1::Value {
value: Some(v1::value::Value::TsMillisecondValue(data)),
}
}

/// Convert [ConcreteDataType] to [ColumnDataType].
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
let column_data_type = match data_type {
Expand Down Expand Up @@ -186,3 +236,5 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType
false
}
}

// TODO(yingwen): Tests.
12 changes: 7 additions & 5 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use store_api::storage::RegionId;

use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::metadata::RegionMetadataRef;
use crate::region::version::VersionControlRef;

/// Type to store region version.
pub type VersionNumber = u32;
Expand All @@ -40,7 +41,7 @@ pub(crate) struct MitoRegion {
pub(crate) region_id: RegionId,

/// Version controller for this region.
version_control: VersionControlRef,
pub(crate) version_control: VersionControlRef,
/// Manager to maintain manifest for this region.
manifest_manager: RegionManifestManager,
}
Expand All @@ -57,9 +58,10 @@ impl MitoRegion {
Ok(())
}

/// Returns current version of the region.
pub(crate) fn version(&self) -> VersionRef {
self.version_control.current()
/// Returns current metadata of the region.
pub(crate) fn metadata(&self) -> RegionMetadataRef {
let version_data = self.version_control.current();
version_data.version.metadata.clone()
}
}

Expand Down
35 changes: 26 additions & 9 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,56 @@
//! Reason: data may be flushed/compacted and some data with old sequence may be removed
//! and became invisible between step 1 and 2, so need to acquire version at first.

use std::sync::Arc;
use std::sync::{Arc, RwLock};

use arc_swap::ArcSwap;
use store_api::storage::SequenceNumber;

use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::MemtableRef;
use crate::metadata::RegionMetadataRef;
use crate::sst::version::{SstVersion, SstVersionRef};
use crate::wal::EntryId;

/// Controls version of in memory metadata for a region.
/// Controls metadata and sequence numbers for a region.
///
/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata
/// will generate a new [Version].
#[derive(Debug)]
pub(crate) struct VersionControl {
/// Latest version.
version: ArcSwap<Version>,
data: RwLock<VersionControlData>,
}

impl VersionControl {
/// Returns a new [VersionControl] with specific `version`.
pub(crate) fn new(version: Version) -> VersionControl {
VersionControl {
version: ArcSwap::new(Arc::new(version)),
data: RwLock::new(VersionControlData {
version: Arc::new(version),
committed_sequence: 0,
last_entry_id: 0,
}),
}
}

/// Returns current [Version].
pub(crate) fn current(&self) -> VersionRef {
self.version.load_full()
/// Returns current copy of data.
pub(crate) fn current(&self) -> VersionControlData {
self.data.read().unwrap().clone()
}
}

pub(crate) type VersionControlRef = Arc<VersionControl>;

/// Data of [VersionControl].
#[derive(Debug, Clone)]
pub(crate) struct VersionControlData {
/// Latest version.
pub(crate) version: VersionRef,
/// Sequence number of last committed data.
pub(crate) committed_sequence: SequenceNumber,
/// Last WAL entry Id.
pub(crate) last_entry_id: EntryId,
}

/// Static metadata of a region.
#[derive(Clone, Debug)]
pub(crate) struct Version {
Expand Down
Loading
Loading