From 62541fc931a193e079eb7ce8bfe2c6f17265c35a Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Thu, 28 Nov 2024 13:32:35 +0800 Subject: [PATCH] feat: Add equality delete writer (#703) * copy from #372 * fix test and refine ArrowFieldProjector * add comment and rename to make code more clear * refine code * refine RecordBatchProjector * refine error * refine function name --------- Co-authored-by: ZENOTME --- crates/iceberg/src/arrow/mod.rs | 1 + .../src/arrow/record_batch_projector.rs | 291 ++++++++++ .../base_writer/equality_delete_writer.rs | 503 ++++++++++++++++++ crates/iceberg/src/writer/base_writer/mod.rs | 1 + 4 files changed, 796 insertions(+) create mode 100644 crates/iceberg/src/arrow/record_batch_projector.rs create mode 100644 crates/iceberg/src/writer/base_writer/equality_delete_writer.rs diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 31a892fa8..0f01324cb 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -20,6 +20,7 @@ mod schema; pub use schema::*; mod reader; +pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; pub use reader::*; diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs new file mode 100644 index 000000000..f218983aa --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; + +use crate::error::Result; +use crate::{Error, ErrorKind}; + +/// Help to project specific field from `RecordBatch`` according to the fields id. +#[derive(Clone)] +pub(crate) struct RecordBatchProjector { + // A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure. + // E.g. [[0], [1, 2]] means the first field is accessed directly from the first column, + // while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column). + field_indices: Vec>, + // The schema reference after projection. This schema is derived from the original schema based on the given field IDs. + projected_schema: SchemaRef, +} + +impl RecordBatchProjector { + /// Init ArrowFieldProjector + /// + /// This function will iterate through the field and fetch the field from the original schema according to the field ids. + /// The function to fetch the field id from the field is provided by `field_id_fetch_func`, return None if the field need to be skipped. + /// This function will iterate through the nested fields if the field is a struct, `searchable_field_func` can be used to control whether + /// iterate into the nested fields. + pub(crate) fn new( + original_schema: SchemaRef, + field_ids: &[i32], + field_id_fetch_func: F1, + searchable_field_func: F2, + ) -> Result + where + F1: Fn(&Field) -> Result>, + F2: Fn(&Field) -> bool, + { + let mut field_indices = Vec::with_capacity(field_ids.len()); + let mut fields = Vec::with_capacity(field_ids.len()); + for &id in field_ids { + let mut field_index = vec![]; + let field = Self::fetch_field_index( + original_schema.fields(), + &mut field_index, + id as i64, + &field_id_fetch_func, + &searchable_field_func, + )? + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Field not found") + .with_context("field_id", id.to_string()) + })?; + fields.push(field.clone()); + field_indices.push(field_index); + } + let delete_arrow_schema = Arc::new(Schema::new(fields)); + Ok(Self { + field_indices, + projected_schema: delete_arrow_schema, + }) + } + + fn fetch_field_index( + fields: &Fields, + index_vec: &mut Vec, + target_field_id: i64, + field_id_fetch_func: &F1, + searchable_field_func: &F2, + ) -> Result> + where + F1: Fn(&Field) -> Result>, + F2: Fn(&Field) -> bool, + { + for (pos, field) in fields.iter().enumerate() { + let id = field_id_fetch_func(field)?; + if let Some(id) = id { + if target_field_id == id { + index_vec.push(pos); + return Ok(Some(field.clone())); + } + } + if let DataType::Struct(inner) = field.data_type() { + if searchable_field_func(field) { + if let Some(res) = Self::fetch_field_index( + inner, + index_vec, + target_field_id, + field_id_fetch_func, + searchable_field_func, + )? { + index_vec.push(pos); + return Ok(Some(res)); + } + } + } + } + Ok(None) + } + + /// Return the reference of projected schema + pub(crate) fn projected_schema_ref(&self) -> &SchemaRef { + &self.projected_schema + } + + /// Do projection with record batch + pub(crate) fn project_bacth(&self, batch: RecordBatch) -> Result { + RecordBatch::try_new( + self.projected_schema.clone(), + self.project_column(batch.columns())?, + ) + .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}"))) + } + + /// Do projection with columns + pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result> { + self.field_indices + .iter() + .map(|index_vec| Self::get_column_by_field_index(batch, index_vec)) + .collect::>>() + } + + fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { + let mut rev_iterator = field_index.iter().rev(); + let mut array = batch[*rev_iterator.next().unwrap()].clone(); + for idx in rev_iterator { + array = array + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::Unexpected, + "Cannot convert Array to StructArray", + ))? + .column(*idx) + .clone(); + } + Ok(array) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Fields, Schema}; + + use crate::arrow::record_batch_projector::RecordBatchProjector; + use crate::{Error, ErrorKind}; + + #[test] + fn test_record_batch_projector_nested_level() { + let inner_fields = vec![ + Field::new("inner_field1", DataType::Int32, false), + Field::new("inner_field2", DataType::Utf8, false), + ]; + let fields = vec![ + Field::new("field1", DataType::Int32, false), + Field::new( + "field2", + DataType::Struct(Fields::from(inner_fields.clone())), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + + let field_id_fetch_func = |field: &Field| match field.name().as_str() { + "field1" => Ok(Some(1)), + "field2" => Ok(Some(2)), + "inner_field1" => Ok(Some(3)), + "inner_field2" => Ok(Some(4)), + _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), + }; + let projector = + RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func, |_| true) + .unwrap(); + + assert!(projector.field_indices.len() == 2); + assert_eq!(projector.field_indices[0], vec![0]); + assert_eq!(projector.field_indices[1], vec![0, 1]); + + let int_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let inner_int_array = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef; + let inner_string_array = Arc::new(StringArray::from(vec!["x", "y", "z"])) as ArrayRef; + let struct_array = Arc::new(StructArray::from(vec![ + ( + Arc::new(inner_fields[0].clone()), + inner_int_array as ArrayRef, + ), + ( + Arc::new(inner_fields[1].clone()), + inner_string_array as ArrayRef, + ), + ])) as ArrayRef; + let batch = RecordBatch::try_new(schema, vec![int_array, struct_array]).unwrap(); + + let projected_batch = projector.project_bacth(batch).unwrap(); + assert_eq!(projected_batch.num_columns(), 2); + let projected_int_array = projected_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let projected_inner_int_array = projected_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(projected_int_array.values(), &[1, 2, 3]); + assert_eq!(projected_inner_int_array.values(), &[4, 5, 6]); + } + + #[test] + fn test_field_not_found() { + let inner_fields = vec![ + Field::new("inner_field1", DataType::Int32, false), + Field::new("inner_field2", DataType::Utf8, false), + ]; + + let fields = vec![ + Field::new("field1", DataType::Int32, false), + Field::new( + "field2", + DataType::Struct(Fields::from(inner_fields.clone())), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + + let field_id_fetch_func = |field: &Field| match field.name().as_str() { + "field1" => Ok(Some(1)), + "field2" => Ok(Some(2)), + "inner_field1" => Ok(Some(3)), + "inner_field2" => Ok(Some(4)), + _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), + }; + let projector = + RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func, |_| true); + + assert!(projector.is_err()); + } + + #[test] + fn test_field_not_reachable() { + let inner_fields = vec![ + Field::new("inner_field1", DataType::Int32, false), + Field::new("inner_field2", DataType::Utf8, false), + ]; + + let fields = vec![ + Field::new("field1", DataType::Int32, false), + Field::new( + "field2", + DataType::Struct(Fields::from(inner_fields.clone())), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + + let field_id_fetch_func = |field: &Field| match field.name().as_str() { + "field1" => Ok(Some(1)), + "field2" => Ok(Some(2)), + "inner_field1" => Ok(Some(3)), + "inner_field2" => Ok(Some(4)), + _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), + }; + let projector = + RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| false); + assert!(projector.is_err()); + + let projector = + RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true); + assert!(projector.is_ok()); + } +} diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs new file mode 100644 index 000000000..222961fc4 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -0,0 +1,503 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provide `EqualityDeleteWriter`. + +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, SchemaRef as ArrowSchemaRef}; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::record_batch_projector::RecordBatchProjector; +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{DataFile, SchemaRef, Struct}; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Builder for `EqualityDeleteWriter`. +#[derive(Clone)] +pub struct EqualityDeleteFileWriterBuilder { + inner: B, +} + +impl EqualityDeleteFileWriterBuilder { + /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B) -> Self { + Self { inner } + } +} + +/// Config for `EqualityDeleteWriter`. +pub struct EqualityDeleteWriterConfig { + // Field ids used to determine row equality in equality delete files. + equality_ids: Vec, + // Projector used to project the data chunk into specific fields. + projector: RecordBatchProjector, + partition_value: Struct, +} + +impl EqualityDeleteWriterConfig { + /// Create a new `DataFileWriterConfig` with equality ids. + pub fn new( + equality_ids: Vec, + original_schema: SchemaRef, + partition_value: Option, + ) -> Result { + let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); + let projector = RecordBatchProjector::new( + original_arrow_schema, + &equality_ids, + // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids + // - The identifier field ids must be used for primitive types. + // - The identifier field ids must not be used for floating point types or nullable fields. + // - The identifier field ids can be nested field of struct but not nested field of nullable struct. + |field| { + // Only primitive type is allowed to be used for identifier field ids + if field.is_nullable() + || !field.data_type().is_primitive() + || matches!( + field.data_type(), + DataType::Float16 | DataType::Float32 | DataType::Float64 + ) + { + return Ok(None); + } + Ok(Some( + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Field metadata is missing.") + })? + .parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?, + )) + }, + |field: &Field| !field.is_nullable(), + )?; + Ok(Self { + equality_ids, + projector, + partition_value: partition_value.unwrap_or(Struct::empty()), + }) + } + + /// Return projected Schema + pub fn projected_arrow_schema_ref(&self) -> &ArrowSchemaRef { + self.projector.projected_schema_ref() + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { + type R = EqualityDeleteFileWriter; + type C = EqualityDeleteWriterConfig; + + async fn build(self, config: Self::C) -> Result { + Ok(EqualityDeleteFileWriter { + inner_writer: Some(self.inner.clone().build().await?), + projector: config.projector, + equality_ids: config.equality_ids, + partition_value: config.partition_value, + }) + } +} + +/// Writer used to write equality delete files. +pub struct EqualityDeleteFileWriter { + inner_writer: Option, + projector: RecordBatchProjector, + equality_ids: Vec, + partition_value: Struct, +} + +#[async_trait::async_trait] +impl IcebergWriter for EqualityDeleteFileWriter { + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + let batch = self.projector.project_bacth(batch)?; + if let Some(writer) = self.inner_writer.as_mut() { + writer.write(&batch).await + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer has been closed.", + )) + } + } + + async fn close(&mut self) -> Result> { + if let Some(writer) = self.inner_writer.take() { + Ok(writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(crate::spec::DataContentType::EqualityDeletes); + res.equality_ids(self.equality_ids.iter().copied().collect_vec()); + res.partition(self.partition_value.clone()); + res.build().expect("msg") + }) + .collect_vec()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Equality delete inner writer has been closed.", + )) + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::types::Int32Type; + use arrow_array::{ArrayRef, Int32Array, RecordBatch, StructArray}; + use arrow_schema::DataType; + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; + use crate::io::{FileIO, FileIOBuilder}; + use crate::spec::{ + DataFile, DataFileFormat, ListType, MapType, NestedField, PrimitiveType, Schema, + StructType, Type, + }; + use crate::writer::base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, + }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + + async fn check_parquet_data_file_with_equality_delete_write( + file_io: &FileIO, + data_file: &DataFile, + batch: &RecordBatch, + ) { + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + + // read the written file + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + // read the written file + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(*batch, res); + + // check metadata + let expect_column_num = batch.num_columns(); + + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + + assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64); + + assert_eq!(data_file.column_sizes.len(), expect_column_num); + + for (index, id) in data_file.column_sizes().keys().sorted().enumerate() { + metadata + .row_groups() + .iter() + .map(|group| group.columns()) + .for_each(|column| { + assert_eq!( + *data_file.column_sizes.get(id).unwrap() as i64, + column.get(index).unwrap().compressed_size() + ); + }); + } + + assert_eq!(data_file.value_counts.len(), expect_column_num); + data_file.value_counts.iter().for_each(|(_, &v)| { + let expect = metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64; + assert_eq!(v, expect); + }); + + for (index, id) in data_file.null_value_counts().keys().enumerate() { + let expect = batch.column(index).null_count() as u64; + assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect); + } + + assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups()); + data_file + .split_offsets + .iter() + .enumerate() + .for_each(|(i, &v)| { + let expect = metadata.row_groups()[i].file_offset().unwrap(); + assert_eq!(v, expect); + }); + } + + #[tokio::test] + async fn test_equality_delete_writer() -> Result<(), anyhow::Error> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // prepare data + // Int, Struct(Int), String, List(Int), Struct(Struct(Int)) + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required( + 1, + "col1", + Type::Struct(StructType::new(vec![NestedField::required( + 5, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into(), + NestedField::required(2, "col2", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required( + 3, + "col3", + Type::List(ListType::new( + NestedField::required(6, "element", Type::Primitive(PrimitiveType::Int)) + .into(), + )), + ) + .into(), + NestedField::required( + 4, + "col4", + Type::Struct(StructType::new(vec![NestedField::required( + 7, + "sub_col", + Type::Struct(StructType::new(vec![NestedField::required( + 8, + "sub_sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into()])), + ) + .into(), + ]) + .build() + .unwrap(); + let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let col0 = Arc::new(Int32Array::from_iter_values(vec![1; 1024])) as ArrayRef; + let col1 = Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(1).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))], + None, + )); + let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![ + "test"; + 1024 + ])) as ArrayRef; + let col3 = Arc::new({ + let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ + Some( + vec![Some(1),] + ); + 1024 + ]) + .into_parts(); + arrow_array::ListArray::new( + if let DataType::List(field) = arrow_schema.fields.get(3).unwrap().data_type() { + field.clone() + } else { + unreachable!() + }, + list_parts.1, + list_parts.2, + list_parts.3, + ) + }) as ArrayRef; + let col4 = Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() { + fields.clone() + } else { + unreachable!() + }, + vec![Arc::new(StructArray::new( + if let DataType::Struct(fields) = arrow_schema.fields.get(4).unwrap().data_type() { + if let DataType::Struct(fields) = fields.first().unwrap().data_type() { + fields.clone() + } else { + unreachable!() + } + } else { + unreachable!() + }, + vec![Arc::new(Int32Array::from_iter_values(vec![1; 1024]))], + None, + ))], + None, + )); + let columns = vec![col0, col1, col2, col3, col4]; + let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); + + let equality_ids = vec![0_i32, 8]; + let equality_config = + EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); + let delete_schema = + arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); + let projector = equality_config.projector.clone(); + + // prepare writer + let pb = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(delete_schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + + let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) + .build(equality_config) + .await?; + + // write + equality_delete_writer.write(to_write.clone()).await?; + let res = equality_delete_writer.close().await?; + assert_eq!(res.len(), 1); + let data_file = res.into_iter().next().unwrap(); + + // check + let to_write_projected = projector.project_bacth(to_write)?; + check_parquet_data_file_with_equality_delete_write( + &file_io, + &data_file, + &to_write_projected, + ) + .await; + Ok(()) + } + + #[tokio::test] + async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "col0", Type::Primitive(PrimitiveType::Float)).into(), + NestedField::required(1, "col1", Type::Primitive(PrimitiveType::Double)).into(), + NestedField::optional(2, "col2", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required( + 3, + "col3", + Type::Struct(StructType::new(vec![NestedField::required( + 4, + "sub_col", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into(), + NestedField::optional( + 5, + "col4", + Type::Struct(StructType::new(vec![NestedField::required( + 6, + "sub_col2", + Type::Primitive(PrimitiveType::Int), + ) + .into()])), + ) + .into(), + NestedField::required( + 7, + "col5", + Type::Map(MapType::new( + Arc::new(NestedField::required( + 8, + "key", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::required( + 9, + "value", + Type::Primitive(PrimitiveType::Int), + )), + )), + ) + .into(), + NestedField::required( + 10, + "col6", + Type::List(ListType::new(Arc::new(NestedField::required( + 11, + "element", + Type::Primitive(PrimitiveType::Int), + )))), + ) + .into(), + ]) + .build() + .unwrap(), + ); + // Float and Double are not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err()); + // Int is nullable, not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![2], schema.clone(), None).is_err()); + // Struct is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err()); + // Nested field of struct is allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok()); + // Nested field of optional struct is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![6], schema.clone(), None).is_err()); + // Nested field of map is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err()); + // Nested field of list is not allowed to be used for equality delete + assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err()); + assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err()); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37da2ab81..37ab97eb6 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -18,3 +18,4 @@ //! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`. pub mod data_file_writer; +pub mod equality_delete_writer;