Skip to content

Commit

Permalink
feat: implement compat reader
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Sep 5, 2023
1 parent 66a0686 commit 73eacbe
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 6 deletions.
13 changes: 11 additions & 2 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ impl Batch {
.build()
}

/// Tries to set fields for the batch.
pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
Batch::new(
self.primary_key,
self.timestamps,
self.sequences,
self.op_types,
fields,
)
}

/// Returns primary key of the batch.
pub fn primary_key(&self) -> &[u8] {
&self.primary_key
Expand Down Expand Up @@ -593,8 +604,6 @@ impl Source {
/// The reader must guarantee [Batch]es returned by it have the same schema.
#[async_trait]
pub trait BatchReader: Send {
// TODO(yingwen): fields of the batch returned.

/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
Expand Down
67 changes: 63 additions & 4 deletions src/mito2/src/read/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use store_api::storage::ColumnId;

use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, SortField};
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};

/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
Expand Down Expand Up @@ -61,11 +61,18 @@ impl<R> CompatReader<R> {
#[async_trait::async_trait]
impl<R: BatchReader> BatchReader for CompatReader<R> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let Some(batch) = self.reader.next_batch().await? else {
let Some(mut batch) = self.reader.next_batch().await? else {
return Ok(None);
};

todo!()
if let Some(compat_pk) = &self.compat_pk {
batch = compat_pk.compat(batch)?;
}
if let Some(compat_fields) = &self.compat_fields {
batch = compat_fields.compat(batch);
}

Ok(Some(batch))
}
}

Expand Down Expand Up @@ -99,6 +106,23 @@ struct CompatPrimaryKey {
values: Vec<Value>,
}

impl CompatPrimaryKey {
/// Make primary key of the `batch` compatible.
#[must_use]
fn compat(&self, mut batch: Batch) -> Result<Batch> {
let mut buffer =
Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size());
buffer.extend_from_slice(batch.primary_key());
self.converter.encode_to_vec(
self.values.iter().map(|value| value.as_value_ref()),
&mut buffer,
)?;

batch.set_primary_key(buffer);
Ok(batch)
}
}

/// Helper to make fields compatible.
struct CompatFields {
/// Column Ids the reader actually returns.
Expand All @@ -107,6 +131,41 @@ struct CompatFields {
index_or_defaults: Vec<IndexOrDefault>,
}

impl CompatFields {
/// Make fields of the `batch` compatible.
#[must_use]
fn compat(&self, batch: Batch) -> Batch {
debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
debug_assert!(self
.actual_fields
.iter()
.zip(batch.fields())
.all(|(id, batch_column)| *id == batch_column.column_id));

let len = batch.num_rows();
let fields = self
.index_or_defaults
.iter()
.map(|index_or_default| match index_or_default {
IndexOrDefault::Index(index) => batch.fields()[*index].clone(),
IndexOrDefault::DefaultValue {
column_id,
default_vector,
} => {
let data = default_vector.replicate(&[len]);
BatchColumn {
column_id: *column_id,
data,
}
}
})
.collect();

// Safety: We ensure all columns have the same length and the new batch should be valid.
batch.with_fields(fields).unwrap()
}
}

/// Creates a [CompatPrimaryKey] if needed.
fn may_compat_primary_key(
expect: &RegionMetadata,
Expand Down

0 comments on commit 73eacbe

Please sign in to comment.