Skip to content

Commit

Permalink
add position delete writer support
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Nov 19, 2024
1 parent b2fb803 commit 03f49ae
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 1 deletion.
15 changes: 14 additions & 1 deletion crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow_array::{
BooleanArray, Date32Array, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array,
Int64Array, PrimitiveArray, Scalar, StringArray, TimestampMicrosecondArray,
};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema as ArrowSchema, TimeUnit};
use bitvec::macros::internal::funty::Fundamental;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use parquet::file::statistics::Statistics;
Expand Down Expand Up @@ -607,6 +607,19 @@ impl SchemaVisitor for ToArrowSchemaConverter {
}
}

/// Convert iceberg field to an arrow field.
pub fn field_to_arrow_field(field: &crate::spec::NestedFieldRef) -> Result<FieldRef> {
let mut converter = ToArrowSchemaConverter;
converter.before_struct_field(field)?;
let result = crate::spec::visit_type(&field.field_type, &mut converter)?;
converter.after_struct_field(field)?;
let result = converter.field(field, result)?;
match result {
ArrowSchemaOrFieldOrType::Field(field) => Ok(field.into()),
_ => unreachable!(),
}
}

/// Convert iceberg schema to an arrow schema.
pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
let mut converter = ToArrowSchemaConverter;
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/writer/base_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
//! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
pub mod data_file_writer;
pub mod position_delete_file_writer;
320 changes: 320 additions & 0 deletions crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Position delete file writer.
use std::collections::BTreeMap;
use std::sync::Arc;

use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
use arrow_schema::{Schema, SchemaRef};
use once_cell::sync::Lazy;

use crate::arrow::field_to_arrow_field;
use crate::spec::{DataFile, NestedField, PrimitiveType, Struct, Type};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

/// Builder for `MemoryPositionDeleteWriter`.
#[derive(Clone)]
pub struct MemoryPositionDeleteWriterBuilder<B: FileWriterBuilder> {
inner: B,
}

impl<B: FileWriterBuilder> MemoryPositionDeleteWriterBuilder<B> {
/// Create a new `MemoryPositionDeleteWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B) -> Self {
Self { inner }
}
}

/// The config for `MemoryPositionDeleteWriter`.
pub struct MemoryPositionDeleteWriterConfig {
/// The number of max rows to cache in memory.
pub cache_num: usize,
/// The partition value of the position delete file.
pub partition_value: Struct,
}

impl MemoryPositionDeleteWriterConfig {
/// Create a new `MemoryPositionDeleteWriterConfig`.
pub fn new(cache_num: usize, partition_value: Option<Struct>) -> Self {
Self {
cache_num,
partition_value: partition_value.unwrap_or(Struct::empty()),
}
}
}

static POSITION_DELETE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Schema::new(vec![
field_to_arrow_field(&Arc::new(NestedField::required(
2147483546,
"file_path",
Type::Primitive(PrimitiveType::String),
)))
.unwrap(),
field_to_arrow_field(&Arc::new(NestedField::required(
2147483545,
"pos",
Type::Primitive(PrimitiveType::Long),
)))
.unwrap(),
])
.into()
});

#[async_trait::async_trait]
impl<'a, B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput<'a>, Vec<DataFile>>
for MemoryPositionDeleteWriterBuilder<B>
{
type R = MemoryPositionDeleteWriter<B>;
type C = MemoryPositionDeleteWriterConfig;

async fn build(self, config: Self::C) -> Result<Self::R> {
Ok(MemoryPositionDeleteWriter {
inner_writer_builder: self.inner.clone(),
cache_num: config.cache_num,
cache: BTreeMap::new(),
data_files: Vec::new(),
partition_value: config.partition_value,
})
}
}

/// Position delete input.
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct PositionDeleteInput<'a> {
/// The path of the file.
pub path: &'a str,
/// The offset of the position delete.
pub offset: i64,
}

/// The memory position delete writer.
pub struct MemoryPositionDeleteWriter<B: FileWriterBuilder> {
inner_writer_builder: B,
cache_num: usize,
cache: BTreeMap<String, Vec<i64>>,
data_files: Vec<DataFile>,
partition_value: Struct,
}

impl<'a, B: FileWriterBuilder> MemoryPositionDeleteWriter<B> {
async fn write_cache_out(&mut self) -> Result<()> {
let mut keys = Vec::new();
let mut values = Vec::new();
let mut cache = std::mem::take(&mut self.cache);
for (key, offsets) in cache.iter_mut() {
offsets.sort();
let key_ref = key.as_str();
for offset in offsets {
keys.push(key_ref);
values.push(*offset);
}
}
let key_array = Arc::new(StringArray::from(keys)) as ArrayRef;
let value_array = Arc::new(Int64Array::from(values)) as ArrayRef;
let record_batch =
RecordBatch::try_new(POSITION_DELETE_SCHEMA.clone(), vec![key_array, value_array])?;
let mut writer = self.inner_writer_builder.clone().build().await?;
writer.write(&record_batch).await?;
self.data_files
.extend(writer.close().await?.into_iter().map(|mut res| {
res.content(crate::spec::DataContentType::PositionDeletes);
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
}));
Ok(())
}
}

/// Implement `IcebergWriter` for `PositionDeleteWriter`.
impl<'a, B: FileWriterBuilder> IcebergWriter<PositionDeleteInput<'a>>
for MemoryPositionDeleteWriter<B>
{
#[doc = " Write data to iceberg table."]
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn write<'life0, 'async_trait>(
&'life0 mut self,
input: PositionDeleteInput<'a>,
) -> ::core::pin::Pin<
Box<dyn ::core::future::Future<Output = Result<()>> + ::core::marker::Send + 'async_trait>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
// Fast path: write to cache directly.
if self.cache.len() < self.cache_num {
if let Some(v) = self.cache.get_mut(input.path) {
v.push(input.offset);
} else {
self.cache
.insert(input.path.to_string(), vec![input.offset]);
}
return Box::pin(async move { Ok(()) });
}

// Slow path: write to file first.
let path = input.path.to_string();
Box::pin(async move {
self.write_cache_out().await?;
self.cache.insert(path, vec![input.offset]);
Ok(())
})
}

#[doc = " Close the writer and return the written data files."]
#[doc = " If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again."]
#[doc = " # NOTE"]
#[doc = " After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic."]
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = Result<Vec<DataFile>>>
+ ::core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move {
self.write_cache_out().await?;
Ok(std::mem::take(&mut self.data_files))
})
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use arrow_array::{Int64Array, StringArray};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use crate::arrow::arrow_schema_to_schema;
use crate::io::FileIOBuilder;
use crate::spec::{DataContentType, DataFileFormat, Struct};
use crate::writer::base_writer::position_delete_file_writer::{
MemoryPositionDeleteWriterBuilder, MemoryPositionDeleteWriterConfig, PositionDeleteInput,
POSITION_DELETE_SCHEMA,
};
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

#[tokio::test]
async fn test_position_delete_writer() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new("memory").build().unwrap();
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

let pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(arrow_schema_to_schema(&POSITION_DELETE_SCHEMA).unwrap()),
file_io.clone(),
location_gen,
file_name_gen,
);
let mut position_delete_writer = MemoryPositionDeleteWriterBuilder::new(pw)
.build(MemoryPositionDeleteWriterConfig::new(10, None))
.await?;

// Write some position delete inputs
let mut inputs = [
PositionDeleteInput {
path: "file2.parquet",
offset: 2,
},
PositionDeleteInput {
path: "file2.parquet",
offset: 1,
},
PositionDeleteInput {
path: "file2.parquet",
offset: 3,
},
PositionDeleteInput {
path: "file3.parquet",
offset: 2,
},
PositionDeleteInput {
path: "file1.parquet",
offset: 5,
},
PositionDeleteInput {
path: "file1.parquet",
offset: 4,
},
PositionDeleteInput {
path: "file1.parquet",
offset: 1,
},
];
for input in inputs.iter() {
position_delete_writer.write(*input).await?;
}

let data_files = position_delete_writer.close().await.unwrap();
assert_eq!(data_files.len(), 1);
assert_eq!(data_files[0].file_format, DataFileFormat::Parquet);
assert_eq!(data_files[0].content, DataContentType::PositionDeletes);
assert_eq!(data_files[0].partition, Struct::empty());

let parquet_file = file_io
.new_input(&data_files[0].file_path)?
.read()
.await
.unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap();
let reader = builder.build().unwrap();
let batches = reader.map(|x| x.unwrap()).collect::<Vec<_>>();

let path_column = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let offset_column = batches[0]
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();

inputs.sort_by(|a, b| a.path.cmp(b.path).then_with(|| a.offset.cmp(&b.offset)));
for (i, input) in inputs.iter().enumerate() {
assert_eq!(path_column.value(i), input.path);
assert_eq!(offset_column.value(i), input.offset);
}

Ok(())
}
}

0 comments on commit 03f49ae

Please sign in to comment.