From 73eacbed6f5545982b59ecb97d061543840e1cff Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 5 Sep 2023 17:00:15 +0800 Subject: [PATCH] feat: implement compat reader --- src/mito2/src/read.rs | 13 +++++-- src/mito2/src/read/compat.rs | 67 +++++++++++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index ef7744385e3a..fed724eb9fc7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -80,6 +80,17 @@ impl Batch { .build() } + /// Tries to set fields for the batch. + pub fn with_fields(self, fields: Vec) -> Result { + Batch::new( + self.primary_key, + self.timestamps, + self.sequences, + self.op_types, + fields, + ) + } + /// Returns primary key of the batch. pub fn primary_key(&self) -> &[u8] { &self.primary_key @@ -593,8 +604,6 @@ impl Source { /// The reader must guarantee [Batch]es returned by it have the same schema. #[async_trait] pub trait BatchReader: Send { - // TODO(yingwen): fields of the batch returned. - /// Fetch next [Batch]. /// /// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()` diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 21b5502631f6..8d33d0ed5138 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -24,8 +24,8 @@ use store_api::storage::ColumnId; use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result}; use crate::read::projection::ProjectionMapper; -use crate::read::{Batch, BatchReader}; -use crate::row_converter::{McmpRowCodec, SortField}; +use crate::read::{Batch, BatchColumn, BatchReader}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Reader to adapt schema of underlying reader to expected schema. pub struct CompatReader { @@ -61,11 +61,18 @@ impl CompatReader { #[async_trait::async_trait] impl BatchReader for CompatReader { async fn next_batch(&mut self) -> Result> { - let Some(batch) = self.reader.next_batch().await? else { + let Some(mut batch) = self.reader.next_batch().await? else { return Ok(None); }; - todo!() + if let Some(compat_pk) = &self.compat_pk { + batch = compat_pk.compat(batch)?; + } + if let Some(compat_fields) = &self.compat_fields { + batch = compat_fields.compat(batch); + } + + Ok(Some(batch)) } } @@ -99,6 +106,23 @@ struct CompatPrimaryKey { values: Vec, } +impl CompatPrimaryKey { + /// Make primary key of the `batch` compatible. + #[must_use] + fn compat(&self, mut batch: Batch) -> Result { + let mut buffer = + Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size()); + buffer.extend_from_slice(batch.primary_key()); + self.converter.encode_to_vec( + self.values.iter().map(|value| value.as_value_ref()), + &mut buffer, + )?; + + batch.set_primary_key(buffer); + Ok(batch) + } +} + /// Helper to make fields compatible. struct CompatFields { /// Column Ids the reader actually returns. @@ -107,6 +131,41 @@ struct CompatFields { index_or_defaults: Vec, } +impl CompatFields { + /// Make fields of the `batch` compatible. + #[must_use] + fn compat(&self, batch: Batch) -> Batch { + debug_assert_eq!(self.actual_fields.len(), batch.fields().len()); + debug_assert!(self + .actual_fields + .iter() + .zip(batch.fields()) + .all(|(id, batch_column)| *id == batch_column.column_id)); + + let len = batch.num_rows(); + let fields = self + .index_or_defaults + .iter() + .map(|index_or_default| match index_or_default { + IndexOrDefault::Index(index) => batch.fields()[*index].clone(), + IndexOrDefault::DefaultValue { + column_id, + default_vector, + } => { + let data = default_vector.replicate(&[len]); + BatchColumn { + column_id: *column_id, + data, + } + } + }) + .collect(); + + // Safety: We ensure all columns have the same length and the new batch should be valid. + batch.with_fields(fields).unwrap() + } +} + /// Creates a [CompatPrimaryKey] if needed. fn may_compat_primary_key( expect: &RegionMetadata,