Skip to content

Commit

Permalink
feat: Implements a reader to make schema compatible (#2326)
Browse files Browse the repository at this point in the history
* docs: update comment

* feat: Add compat reader to SeqScan

* feat: add struct to compat pk and fields

* refactor: remove unused fields from ParquetReader

* feat: compat framework

* feat: Implement CompatPrimaryKey and CompatFields

* feat: implement compat reader

* feat: Test compat reader

* test: test compat reader

* feat: add more checks to concat

* style: fix clippy

* test: more tests for compat reader

* test: test reader with projection
  • Loading branch information
evenyag authored and waynexia committed Sep 12, 2023
1 parent 6041f09 commit 98cba4e
Show file tree
Hide file tree
Showing 10 changed files with 797 additions and 52 deletions.
13 changes: 13 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,18 @@ pub enum Error {
region_id: RegionId,
location: Location,
},

#[snafu(display(
"Failed to compat readers for region {}, reason: {}, location: {}",
region_id,
reason,
location
))]
CompatReader {
region_id: RegionId,
reason: String,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -500,6 +512,7 @@ impl ErrorExt for Error {
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompatReader { .. } => StatusCode::Unexpected,
}
}

Expand Down
101 changes: 87 additions & 14 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

//! Common structs and utilities for reading data.

pub mod compat;
pub mod merge;
pub(crate) mod projection;
pub mod projection;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;

use std::collections::HashSet;
use std::sync::Arc;

use api::v1::OpType;
Expand All @@ -34,17 +36,18 @@ use datatypes::vectors::{
BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};

use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;

/// Storage internal representation of a batch of rows
/// for a primary key (time series).
/// Storage internal representation of a batch of rows for a primary key (time series).
///
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc.
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
#[derive(Debug, PartialEq, Clone)]
pub struct Batch {
/// Primary key encoded in a comparable form.
Expand Down Expand Up @@ -77,6 +80,17 @@ impl Batch {
.build()
}

/// Tries to set fields for the batch.
pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
Batch::new(
self.primary_key,
self.timestamps,
self.sequences,
self.op_types,
fields,
)
}

/// Returns primary key of the batch.
pub fn primary_key(&self) -> &[u8] {
&self.primary_key
Expand Down Expand Up @@ -150,6 +164,11 @@ impl Batch {
Some(self.get_sequence(self.sequences.len() - 1))
}

/// Replaces the primary key of the batch.
pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
self.primary_key = primary_key;
}

/// Slice the batch, returning a new batch.
///
/// # Panics
Expand Down Expand Up @@ -202,15 +221,22 @@ impl Batch {
reason: "batches have different primary key",
}
);
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.fields().len() == first.fields().len()),
InvalidBatchSnafu {
reason: "batches have different field num",
for b in batches.iter().skip(1) {
ensure!(
b.fields.len() == first.fields.len(),
InvalidBatchSnafu {
reason: "batches have different field num",
}
);
for (l, r) in b.fields.iter().zip(&first.fields) {
ensure!(
l.column_id == r.column_id,
InvalidBatchSnafu {
reason: "batches have different fields",
}
);
}
);
}

// We take the primary key from the first batch.
let mut builder = BatchBuilder::new(primary_key);
Expand Down Expand Up @@ -311,6 +337,24 @@ impl Batch {
self.take_in_place(&indices)
}

/// Returns ids of fields in the [Batch] after applying the `projection`.
pub(crate) fn projected_fields(
metadata: &RegionMetadata,
projection: &[ColumnId],
) -> Vec<ColumnId> {
let projected_ids: HashSet<_> = projection.iter().copied().collect();
metadata
.field_columns()
.filter_map(|column| {
if projected_ids.contains(&column.column_id) {
Some(column.column_id)
} else {
None
}
})
.collect()
}

/// Takes the batch in place.
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
Expand Down Expand Up @@ -566,8 +610,6 @@ impl Source {
/// The reader must guarantee [Batch]es returned by it have the same schema.
#[async_trait]
pub trait BatchReader: Send {
// TODO(yingwen): fields of the batch returned.

/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
Expand Down Expand Up @@ -729,6 +771,37 @@ mod tests {
);
}

#[test]
fn test_concat_different_fields() {
let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
let fields = vec![
batch1.fields()[0].clone(),
BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
},
];
// Batch 2 has more fields.
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);

// Batch 2 has different field.
let fields = vec![BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
}];
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}

#[test]
fn test_filter_deleted_empty() {
let mut batch = new_batch(&[], &[], &[], &[]);
Expand Down
Loading

0 comments on commit 98cba4e

Please sign in to comment.