diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 70ece0f3bb06..66335122b36f 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -434,6 +434,18 @@ pub enum Error { region_id: RegionId, location: Location, }, + + #[snafu(display( + "Failed to compat readers for region {}, reason: {}, location: {}", + region_id, + reason, + location + ))] + CompatReader { + region_id: RegionId, + reason: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -500,6 +512,7 @@ impl ErrorExt for Error { RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, RejectWrite { .. } => StatusCode::StorageUnavailable, + CompatReader { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index b42072624179..d15a197e5b63 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -14,11 +14,13 @@ //! Common structs and utilities for reading data. +pub mod compat; pub mod merge; -pub(crate) mod projection; +pub mod projection; pub(crate) mod scan_region; pub(crate) mod seq_scan; +use std::collections::HashSet; use std::sync::Arc; use api::v1::OpType; @@ -34,6 +36,7 @@ use datatypes::vectors::{ BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef, }; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ @@ -41,10 +44,10 @@ use crate::error::{ }; use crate::memtable::BoxedBatchIterator; -/// Storage internal representation of a batch of rows -/// for a primary key (time series). +/// Storage internal representation of a batch of rows for a primary key (time series). /// -/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. +/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields +/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata). #[derive(Debug, PartialEq, Clone)] pub struct Batch { /// Primary key encoded in a comparable form. @@ -77,6 +80,17 @@ impl Batch { .build() } + /// Tries to set fields for the batch. + pub fn with_fields(self, fields: Vec) -> Result { + Batch::new( + self.primary_key, + self.timestamps, + self.sequences, + self.op_types, + fields, + ) + } + /// Returns primary key of the batch. pub fn primary_key(&self) -> &[u8] { &self.primary_key @@ -150,6 +164,11 @@ impl Batch { Some(self.get_sequence(self.sequences.len() - 1)) } + /// Replaces the primary key of the batch. + pub fn set_primary_key(&mut self, primary_key: Vec) { + self.primary_key = primary_key; + } + /// Slice the batch, returning a new batch. /// /// # Panics @@ -202,15 +221,22 @@ impl Batch { reason: "batches have different primary key", } ); - ensure!( - batches - .iter() - .skip(1) - .all(|b| b.fields().len() == first.fields().len()), - InvalidBatchSnafu { - reason: "batches have different field num", + for b in batches.iter().skip(1) { + ensure!( + b.fields.len() == first.fields.len(), + InvalidBatchSnafu { + reason: "batches have different field num", + } + ); + for (l, r) in b.fields.iter().zip(&first.fields) { + ensure!( + l.column_id == r.column_id, + InvalidBatchSnafu { + reason: "batches have different fields", + } + ); } - ); + } // We take the primary key from the first batch. let mut builder = BatchBuilder::new(primary_key); @@ -311,6 +337,24 @@ impl Batch { self.take_in_place(&indices) } + /// Returns ids of fields in the [Batch] after applying the `projection`. + pub(crate) fn projected_fields( + metadata: &RegionMetadata, + projection: &[ColumnId], + ) -> Vec { + let projected_ids: HashSet<_> = projection.iter().copied().collect(); + metadata + .field_columns() + .filter_map(|column| { + if projected_ids.contains(&column.column_id) { + Some(column.column_id) + } else { + None + } + }) + .collect() + } + /// Takes the batch in place. fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> { self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?; @@ -566,8 +610,6 @@ impl Source { /// The reader must guarantee [Batch]es returned by it have the same schema. #[async_trait] pub trait BatchReader: Send { - // TODO(yingwen): fields of the batch returned. - /// Fetch next [Batch]. /// /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()` @@ -729,6 +771,37 @@ mod tests { ); } + #[test] + fn test_concat_different_fields() { + let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]); + let fields = vec![ + batch1.fields()[0].clone(), + BatchColumn { + column_id: 2, + data: Arc::new(UInt64Vector::from_slice([2])), + }, + ]; + // Batch 2 has more fields. + let batch2 = batch1.clone().with_fields(fields).unwrap(); + let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err(); + assert!( + matches!(err, Error::InvalidBatch { .. }), + "unexpected err: {err}" + ); + + // Batch 2 has different field. + let fields = vec![BatchColumn { + column_id: 2, + data: Arc::new(UInt64Vector::from_slice([2])), + }]; + let batch2 = batch1.clone().with_fields(fields).unwrap(); + let err = Batch::concat(vec![batch1, batch2]).unwrap_err(); + assert!( + matches!(err, Error::InvalidBatch { .. }), + "unexpected err: {err}" + ); + } + #[test] fn test_filter_deleted_empty() { let mut batch = new_batch(&[], &[], &[], &[]); diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs new file mode 100644 index 000000000000..b253d183589b --- /dev/null +++ b/src/mito2/src/read/compat.rs @@ -0,0 +1,600 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to adapt readers with different schema. + +use std::collections::HashMap; + +use datatypes::value::Value; +use datatypes::vectors::VectorRef; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::ColumnId; + +use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result}; +use crate::read::projection::ProjectionMapper; +use crate::read::{Batch, BatchColumn, BatchReader}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + +/// Reader to adapt schema of underlying reader to expected schema. +pub struct CompatReader { + /// Underlying reader. + reader: R, + /// Optional primary key adapter. + compat_pk: Option, + /// Optional fields adapter. + compat_fields: Option, +} + +impl CompatReader { + /// Creates a new compat reader. + /// - `mapper` is built from the metadata users expect to see. + /// - `reader_meta` is the metadata of the input reader. + /// - `reader` is the input reader. + pub fn new( + mapper: &ProjectionMapper, + reader_meta: RegionMetadataRef, + reader: R, + ) -> Result> { + let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?; + let compat_fields = may_compat_fields(mapper, &reader_meta)?; + + Ok(CompatReader { + reader, + compat_pk, + compat_fields, + }) + } +} + +#[async_trait::async_trait] +impl BatchReader for CompatReader { + async fn next_batch(&mut self) -> Result> { + let Some(mut batch) = self.reader.next_batch().await? else { + return Ok(None); + }; + + if let Some(compat_pk) = &self.compat_pk { + batch = compat_pk.compat(batch)?; + } + if let Some(compat_fields) = &self.compat_fields { + batch = compat_fields.compat(batch); + } + + Ok(Some(batch)) + } +} + +/// Returns true if `left` and `right` have same columns to read. +/// +/// It only consider column ids. +pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) -> bool { + if left.column_metadatas.len() != right.column_metadatas.len() { + return false; + } + + for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) { + if left_col.column_id != right_col.column_id { + return false; + } + debug_assert_eq!( + left_col.column_schema.data_type, + right_col.column_schema.data_type + ); + debug_assert_eq!(left_col.semantic_type, right_col.semantic_type); + } + + true +} + +/// Helper to make primary key compatible. +#[derive(Debug)] +struct CompatPrimaryKey { + /// Row converter to append values to primary keys. + converter: McmpRowCodec, + /// Default values to append. + values: Vec, +} + +impl CompatPrimaryKey { + /// Make primary key of the `batch` compatible. + fn compat(&self, mut batch: Batch) -> Result { + let mut buffer = + Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size()); + buffer.extend_from_slice(batch.primary_key()); + self.converter.encode_to_vec( + self.values.iter().map(|value| value.as_value_ref()), + &mut buffer, + )?; + + batch.set_primary_key(buffer); + Ok(batch) + } +} + +/// Helper to make fields compatible. +#[derive(Debug)] +struct CompatFields { + /// Column Ids the reader actually returns. + actual_fields: Vec, + /// Indices to convert actual fields to expect fields. + index_or_defaults: Vec, +} + +impl CompatFields { + /// Make fields of the `batch` compatible. + #[must_use] + fn compat(&self, batch: Batch) -> Batch { + debug_assert_eq!(self.actual_fields.len(), batch.fields().len()); + debug_assert!(self + .actual_fields + .iter() + .zip(batch.fields()) + .all(|(id, batch_column)| *id == batch_column.column_id)); + + let len = batch.num_rows(); + let fields = self + .index_or_defaults + .iter() + .map(|index_or_default| match index_or_default { + IndexOrDefault::Index(index) => batch.fields()[*index].clone(), + IndexOrDefault::DefaultValue { + column_id, + default_vector, + } => { + let data = default_vector.replicate(&[len]); + BatchColumn { + column_id: *column_id, + data, + } + } + }) + .collect(); + + // Safety: We ensure all columns have the same length and the new batch should be valid. + batch.with_fields(fields).unwrap() + } +} + +/// Creates a [CompatPrimaryKey] if needed. +fn may_compat_primary_key( + expect: &RegionMetadata, + actual: &RegionMetadata, +) -> Result> { + ensure!( + actual.primary_key.len() <= expect.primary_key.len(), + CompatReaderSnafu { + region_id: expect.region_id, + reason: format!( + "primary key has more columns {} than exepct {}", + actual.primary_key.len(), + expect.primary_key.len() + ), + } + ); + ensure!( + actual.primary_key == expect.primary_key[..actual.primary_key.len()], + CompatReaderSnafu { + region_id: expect.region_id, + reason: format!( + "primary key has different prefix, expect: {:?}, actual: {:?}", + expect.primary_key, actual.primary_key + ), + } + ); + if actual.primary_key.len() == expect.primary_key.len() { + return Ok(None); + } + + // We need to append default values to the primary key. + let to_add = &expect.primary_key[actual.primary_key.len()..]; + let mut fields = Vec::with_capacity(to_add.len()); + let mut values = Vec::with_capacity(to_add.len()); + for column_id in to_add { + // Safety: The id comes from expect region metadata. + let column = expect.column_by_id(*column_id).unwrap(); + fields.push(SortField::new(column.column_schema.data_type.clone())); + let default_value = column + .column_schema + .create_default() + .context(CreateDefaultSnafu { + region_id: expect.region_id, + column: &column.column_schema.name, + })? + .with_context(|| CompatReaderSnafu { + region_id: expect.region_id, + reason: format!( + "key column {} does not have a default value to read", + column.column_schema.name + ), + })?; + values.push(default_value); + } + let converter = McmpRowCodec::new(fields); + + Ok(Some(CompatPrimaryKey { converter, values })) +} + +/// Creates a [CompatFields] if needed. +fn may_compat_fields( + mapper: &ProjectionMapper, + actual: &RegionMetadata, +) -> Result> { + let expect_fields = mapper.batch_fields(); + let actual_fields = Batch::projected_fields(actual, mapper.column_ids()); + if expect_fields == actual_fields { + return Ok(None); + } + + let source_field_index: HashMap<_, _> = actual_fields + .iter() + .enumerate() + .map(|(idx, column_id)| (*column_id, idx)) + .collect(); + + let index_or_defaults = expect_fields + .iter() + .map(|column_id| { + if let Some(index) = source_field_index.get(column_id) { + // Source has this field. + Ok(IndexOrDefault::Index(*index)) + } else { + // Safety: mapper must have this column. + let column = mapper.metadata().column_by_id(*column_id).unwrap(); + // Create a default vector with 1 element for that column. + let default_vector = column + .column_schema + .create_default_vector(1) + .context(CreateDefaultSnafu { + region_id: mapper.metadata().region_id, + column: &column.column_schema.name, + })? + .with_context(|| CompatReaderSnafu { + region_id: mapper.metadata().region_id, + reason: format!( + "column {} does not have a default value to read", + column.column_schema.name + ), + })?; + Ok(IndexOrDefault::DefaultValue { + column_id: column.column_id, + default_vector, + }) + } + }) + .collect::>>()?; + + Ok(Some(CompatFields { + actual_fields, + index_or_defaults, + })) +} + +/// Index in source batch or a default value to fill a column. +#[derive(Debug)] +enum IndexOrDefault { + /// Index of the column in source batch. + Index(usize), + /// Default value for the column. + DefaultValue { + /// Id of the column. + column_id: ColumnId, + /// Default value. The vector has only 1 element. + default_vector: VectorRef, + }, +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::{OpType, SemanticType}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use datatypes::value::ValueRef; + use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + use crate::test_util::{check_reader_result, VecBatchReader}; + + /// Creates a new [RegionMetadata]. + fn new_metadata( + semantic_types: &[(ColumnId, SemanticType)], + primary_key: &[ColumnId], + ) -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + for (id, semantic_type) in semantic_types { + let column_schema = match semantic_type { + SemanticType::Tag => ColumnSchema::new( + format!("tag_{id}"), + ConcreteDataType::string_datatype(), + true, + ), + SemanticType::Field => ColumnSchema::new( + format!("field_{id}"), + ConcreteDataType::int64_datatype(), + true, + ), + SemanticType::Timestamp => ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }; + + builder.push_column_metadata(ColumnMetadata { + column_schema, + semantic_type: *semantic_type, + column_id: *id, + }); + } + builder.primary_key(primary_key.to_vec()); + builder.build().unwrap() + } + + /// Encode primary key. + fn encode_key(keys: &[Option<&str>]) -> Vec { + let fields = (0..keys.len()) + .map(|_| SortField::new(ConcreteDataType::string_datatype())) + .collect(); + let converter = McmpRowCodec::new(fields); + let row = keys.iter().map(|str_opt| match str_opt { + Some(v) => ValueRef::String(v), + None => ValueRef::Null, + }); + + converter.encode(row).unwrap() + } + + /// Creates a batch for specific primary `key`. + /// + /// `fields`: [(column_id of the field, is null)] + fn new_batch( + primary_key: &[u8], + fields: &[(ColumnId, bool)], + start_ts: i64, + num_rows: usize, + ) -> Batch { + let timestamps = Arc::new(TimestampMillisecondVector::from_values( + start_ts..start_ts + num_rows as i64, + )); + let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64)); + let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows])); + let field_columns = fields + .iter() + .map(|(id, is_null)| { + let data = if *is_null { + Arc::new(Int64Vector::from(vec![None; num_rows])) + } else { + Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows])) + }; + BatchColumn { + column_id: *id, + data, + } + }) + .collect(); + Batch::new( + primary_key.to_vec(), + timestamps, + sequences, + op_types, + field_columns, + ) + .unwrap() + } + + #[test] + fn test_invalid_pk_len() { + let reader_meta = new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Tag), + (3, SemanticType::Field), + ], + &[1, 2], + ); + let expect_meta = new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Field), + ], + &[1], + ); + may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err(); + } + + #[test] + fn test_different_pk() { + let reader_meta = new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Tag), + (3, SemanticType::Field), + ], + &[2, 1], + ); + let expect_meta = new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Tag), + (3, SemanticType::Field), + (4, SemanticType::Tag), + ], + &[1, 2, 4], + ); + may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err(); + } + + #[test] + fn test_same_pk() { + let reader_meta = new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Field), + ], + &[1], + ); + assert!(may_compat_primary_key(&reader_meta, &reader_meta) + .unwrap() + .is_none()); + } + + #[test] + fn test_same_fields() { + let reader_meta = Arc::new(new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Field), + ], + &[1], + )); + let mapper = ProjectionMapper::all(&reader_meta).unwrap(); + assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none()) + } + + #[tokio::test] + async fn test_compat_reader() { + let reader_meta = Arc::new(new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Field), + ], + &[1], + )); + let expect_meta = Arc::new(new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Field), + (3, SemanticType::Tag), + (4, SemanticType::Field), + ], + &[1, 3], + )); + let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let k1 = encode_key(&[Some("a")]); + let k2 = encode_key(&[Some("b")]); + let source_reader = VecBatchReader::new(&[ + new_batch(&k1, &[(2, false)], 1000, 3), + new_batch(&k2, &[(2, false)], 1000, 3), + ]); + + let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); + let k1 = encode_key(&[Some("a"), None]); + let k2 = encode_key(&[Some("b"), None]); + check_reader_result( + &mut compat_reader, + &[ + new_batch(&k1, &[(2, false), (4, true)], 1000, 3), + new_batch(&k2, &[(2, false), (4, true)], 1000, 3), + ], + ) + .await; + } + + #[tokio::test] + async fn test_compat_reader_different_order() { + let reader_meta = Arc::new(new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Field), + ], + &[1], + )); + let expect_meta = Arc::new(new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (3, SemanticType::Field), + (2, SemanticType::Field), + (4, SemanticType::Field), + ], + &[1], + )); + let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let k1 = encode_key(&[Some("a")]); + let k2 = encode_key(&[Some("b")]); + let source_reader = VecBatchReader::new(&[ + new_batch(&k1, &[(2, false)], 1000, 3), + new_batch(&k2, &[(2, false)], 1000, 3), + ]); + + let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); + check_reader_result( + &mut compat_reader, + &[ + new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3), + new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3), + ], + ) + .await; + } + + #[tokio::test] + async fn test_compat_reader_projection() { + let reader_meta = Arc::new(new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (2, SemanticType::Field), + ], + &[1], + )); + let expect_meta = Arc::new(new_metadata( + &[ + (0, SemanticType::Timestamp), + (1, SemanticType::Tag), + (3, SemanticType::Field), + (2, SemanticType::Field), + (4, SemanticType::Field), + ], + &[1], + )); + // tag_1, field_2, field_3 + let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap(); + let k1 = encode_key(&[Some("a")]); + let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]); + + let mut compat_reader = + CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap(); + check_reader_result( + &mut compat_reader, + &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)], + ) + .await; + + // tag_1, field_4, field_3 + let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap(); + let k1 = encode_key(&[Some("a")]); + let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]); + + let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); + check_reader_result( + &mut compat_reader, + &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)], + ) + .await; + } +} diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 1ccbf61e0428..219ae6dab97b 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -341,7 +341,7 @@ mod tests { use api::v1::OpType; use super::*; - use crate::test_util::{new_batch, VecBatchReader}; + use crate::test_util::{check_reader_result, new_batch, VecBatchReader}; #[tokio::test] async fn test_merge_reader_empty() { @@ -350,15 +350,6 @@ mod tests { assert!(reader.next_batch().await.unwrap().is_none()); } - async fn check_merge_result(reader: &mut MergeReader, expect: &[Batch]) { - let mut result = Vec::new(); - while let Some(batch) = reader.next_batch().await.unwrap() { - result.push(batch); - } - - assert_eq!(expect, result); - } - #[tokio::test] async fn test_merge_non_overlapping() { let reader1 = VecBatchReader::new(&[ @@ -397,7 +388,7 @@ mod tests { .build() .await .unwrap(); - check_merge_result( + check_reader_result( &mut reader, &[ new_batch( @@ -468,7 +459,7 @@ mod tests { .build() .await .unwrap(); - check_merge_result( + check_reader_result( &mut reader, &[ new_batch( diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 4f61affdc2f4..0af9fa487212 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -25,7 +25,7 @@ use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::ValueRef; use datatypes::vectors::VectorRef; use snafu::{OptionExt, ResultExt}; -use store_api::metadata::RegionMetadata; +use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use crate::error::{InvalidRequestSnafu, Result}; @@ -33,21 +33,26 @@ use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. -pub(crate) struct ProjectionMapper { +pub struct ProjectionMapper { + /// Metadata of the region. + metadata: RegionMetadataRef, /// Maps column in [RecordBatch] to index in [Batch]. batch_indices: Vec, /// Decoder for primary key. codec: McmpRowCodec, /// Schema for converted [RecordBatch]. output_schema: SchemaRef, - /// Id of columns to project. + /// Ids of columns to project. It keeps ids in the same order as the `projection` + /// indices to build the mapper. column_ids: Vec, + /// Ids of field columns in the [Batch]. + batch_fields: Vec, } impl ProjectionMapper { /// Returns a new mapper with projection. - pub(crate) fn new( - metadata: &RegionMetadata, + pub fn new( + metadata: &RegionMetadataRef, projection: impl Iterator, ) -> Result { let projection_len = projection.size_hint().0; @@ -92,25 +97,38 @@ impl ProjectionMapper { ); // Safety: Columns come from existing schema. let output_schema = Arc::new(Schema::new(column_schemas)); + let batch_fields = Batch::projected_fields(metadata, &column_ids); Ok(ProjectionMapper { + metadata: metadata.clone(), batch_indices, codec, output_schema, column_ids, + batch_fields, }) } /// Returns a new mapper without projection. - pub(crate) fn all(metadata: &RegionMetadata) -> Result { + pub fn all(metadata: &RegionMetadataRef) -> Result { ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) } + /// Returns the metadata that created the mapper. + pub(crate) fn metadata(&self) -> &RegionMetadataRef { + &self.metadata + } + /// Returns ids of projected columns. pub(crate) fn column_ids(&self) -> &[ColumnId] { &self.column_ids } + /// Returns ids of fields in [Batch]es the mapper expects to convert. + pub(crate) fn batch_fields(&self) -> &[ColumnId] { + &self.batch_fields + } + /// Returns the schema of converted [RecordBatch]. pub(crate) fn output_schema(&self) -> SchemaRef { self.output_schema.clone() @@ -120,6 +138,13 @@ impl ProjectionMapper { /// /// The batch must match the `projection` using to build the mapper. pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result { + debug_assert_eq!(self.batch_fields.len(), batch.fields().len()); + debug_assert!(self + .batch_fields + .iter() + .zip(batch.fields()) + .all(|(id, batch_col)| *id == batch_col.column_id)); + let pk_values = self .codec .decode(batch.primary_key()) @@ -179,3 +204,5 @@ fn new_repeated_vector( let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) } + +// TODO(yingwen): Add tests for mapper. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 0b7d9e23ed93..5e3381cd1ead 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -28,6 +28,7 @@ use table::predicate::Predicate; use crate::access_layer::AccessLayerRef; use crate::error::Result; use crate::memtable::MemtableRef; +use crate::read::compat::{self, CompatReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::projection::ProjectionMapper; use crate::read::BatchReader; @@ -119,7 +120,15 @@ impl SeqScan { .projection(Some(self.mapper.column_ids().to_vec())) .build() .await?; - builder.push_batch_reader(Box::new(reader)); + if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) { + builder.push_batch_reader(Box::new(reader)); + } else { + // They have different schema. We need to adapt the batch first so the + // mapper can convert the it. + let compat_reader = + CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?; + builder.push_batch_reader(Box::new(compat_reader)); + } } let mut reader = builder.build().await?; // Creates a stream to poll the batch reader and convert batch into record batch. diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 603d39286d40..3b0c37ec2d78 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -36,10 +36,18 @@ pub trait RowCodec { where I: Iterator>; + /// Encodes rows to specific vec. + /// # Note + /// Ensure the length of row iterator matches the length of fields. + fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator>; + /// Decode row values from bytes. fn decode(&self, bytes: &[u8]) -> Result>; } +#[derive(Debug)] pub struct SortField { data_type: ConcreteDataType, } @@ -199,6 +207,7 @@ impl SortField { } /// A memory-comparable row [Value] encoder/decoder. +#[derive(Debug)] pub struct McmpRowCodec { fields: Vec, } @@ -223,12 +232,21 @@ impl RowCodec for McmpRowCodec { where I: Iterator>, { - let mut bytes = Vec::with_capacity(self.estimated_size()); - let mut serializer = Serializer::new(&mut bytes); + let mut buffer = Vec::new(); + self.encode_to_vec(row, &mut buffer)?; + Ok(buffer) + } + + fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator>, + { + buffer.reserve(self.estimated_size()); + let mut serializer = Serializer::new(buffer); for (value, field) in row.zip(self.fields.iter()) { field.serialize(&mut serializer, &value)?; } - Ok(bytes) + Ok(()) } fn decode(&self, bytes: &[u8]) -> Result> { diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 977604c2607f..b8af414b1c97 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -138,6 +138,11 @@ impl ReadFormat { &self.arrow_schema } + /// Gets the metadata of the SST. + pub(crate) fn metadata(&self) -> &RegionMetadataRef { + &self.metadata + } + /// Gets sorted projection indices to read `columns` from parquet files. /// /// This function ignores columns not in `metadata` to for compatibility between diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 06c9533602dd..85b1b47ae097 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -27,7 +27,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::errors::ParquetError; use parquet::format::KeyValue; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::metadata::RegionMetadata; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use table::predicate::Predicate; use tokio::io::BufReader; @@ -42,11 +42,18 @@ use crate::sst::parquet::PARQUET_METADATA_KEY; /// Parquet SST reader builder. pub struct ParquetReaderBuilder { + /// SST directory. file_dir: String, file_handle: FileHandle, object_store: ObjectStore, + /// Predicate to push down. predicate: Option, + /// Time range to filter. time_range: Option, + /// Metadata of columns to read. + /// + /// `None` reads all columns. Due to schema change, the projection + /// can contain columns not in the parquet file. projection: Option>, } @@ -97,10 +104,6 @@ impl ParquetReaderBuilder { Ok(ParquetReader { file_path, file_handle: self.file_handle, - object_store: self.object_store, - predicate: self.predicate, - time_range: self.time_range, - projection: self.projection, stream, read_format, batches: Vec::new(), @@ -206,17 +209,6 @@ pub struct ParquetReader { /// /// Holds the file handle to avoid the file purge purge it. file_handle: FileHandle, - object_store: ObjectStore, - /// Predicate to push down. - predicate: Option, - /// Time range to filter. - time_range: Option, - /// Metadata of columns to read. - /// - /// `None` reads all columns. Due to schema change, the projection - /// can contain columns not in the parquet file. - projection: Option>, - /// Inner parquet record batch stream. stream: BoxedRecordBatchStream, /// Helper to read record batches. @@ -250,3 +242,10 @@ impl BatchReader for ParquetReader { Ok(self.batches.pop()) } } + +impl ParquetReader { + /// Returns the metadata of the SST. + pub fn metadata(&self) -> &RegionMetadataRef { + self.read_format.metadata() + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 01372593a53a..a4a01d290db9 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -384,6 +384,16 @@ pub fn new_batch( .unwrap() } +/// Ensure the reader returns batch as `expect`. +pub async fn check_reader_result(reader: &mut R, expect: &[Batch]) { + let mut result = Vec::new(); + while let Some(batch) = reader.next_batch().await.unwrap() { + result.push(batch); + } + + assert_eq!(expect, result); +} + /// A mock [WriteBufferManager] that supports controlling whether to flush/stall. #[derive(Debug, Default)] pub struct MockWriteBufferManager {