Skip to content

Commit

Permalink
add position delete writer support
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Nov 28, 2024
1 parent f3a571d commit 803cd39
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 0 deletions.
1 change: 1 addition & 0 deletions crates/iceberg/src/writer/base_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
pub mod data_file_writer;
pub mod equality_delete_writer;
pub mod position_delete_file_writer;
284 changes: 284 additions & 0 deletions crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Position delete file writer.
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use arrow_array::builder::{PrimitiveBuilder, StringBuilder};
use arrow_array::types::Int64Type;
use arrow_array::RecordBatch;
use once_cell::sync::Lazy;

use crate::arrow::schema_to_arrow_schema;
use crate::spec::{DataContentType, DataFile, NestedField, PrimitiveType, Schema, Struct, Type};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

/// The config for `MemoryPositionDeleteWriter`.
pub struct PositionDeleteWriterConfig {
/// The partition value of the position delete file.
pub partition_value: Struct,
}

impl PositionDeleteWriterConfig {
/// Create a new `MemoryPositionDeleteWriterConfig`.
pub fn new(partition_value: Option<Struct>) -> Self {
Self {
partition_value: partition_value.unwrap_or(Struct::empty()),
}
}
}

static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::required(
2147483546,
"file_path",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::required(
2147483545,
"pos",
Type::Primitive(PrimitiveType::Long),
)),
])
.build()
.unwrap()
});

/// Position delete input.
#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)]
pub struct PositionDeleteInput<'a> {
/// The path of the file.
path: &'a str,
/// The offset of the position delete.
offset: i64,
}

impl<'a> PositionDeleteInput<'a> {
/// Create a new `PositionDeleteInput`.
pub fn new(path: &'a str, offset: i64) -> Self {
PositionDeleteInput { path, offset }
}
}
/// Builder for `MemoryPositionDeleteWriter`.
#[derive(Clone)]
pub struct PositionDeleteWriterBuilder<B: FileWriterBuilder> {
inner: B,
}

impl<B: FileWriterBuilder> PositionDeleteWriterBuilder<B> {
/// Create a new `MemoryPositionDeleteWriterBuilder` using a `FileWriterBuilder`.
pub fn new(inner: B) -> Self {
Self { inner }
}
}

#[async_trait::async_trait]
impl<'a, B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput<'a>>>
for PositionDeleteWriterBuilder<B>
{
type R = PositionDeleteWriter<B>;
type C = PositionDeleteWriterConfig;

async fn build(self, config: Self::C) -> Result<Self::R> {
Ok(PositionDeleteWriter {
inner_writer: Some(self.inner.build().await?),
partition_value: config.partition_value,
})
}
}

/// Position delete writer.
pub struct PositionDeleteWriter<B: FileWriterBuilder> {
inner_writer: Option<B::R>,
partition_value: Struct,
}

/// Implement `IcebergWriter` for `PositionDeleteWriter`.
impl<'a, B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput<'a>>>
for PositionDeleteWriter<B>
{
fn write<'life0, 'async_trait>(
&'life0 mut self,
input: Vec<PositionDeleteInput<'a>>,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
// Construct record batch using input.
let mut path_column_builder = StringBuilder::new();
let mut offset_column_builder = PrimitiveBuilder::<Int64Type>::new();
for input in input.into_iter() {
path_column_builder.append_value(input.path);
offset_column_builder.append_value(input.offset);
}
let record_batch = RecordBatch::try_new(
Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap()),
vec![
Arc::new(path_column_builder.finish()),
Arc::new(offset_column_builder.finish()),
],
)
.map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()));

Box::pin(async move {
if let Some(inner_writer) = &mut self.inner_writer {
inner_writer.write(&record_batch?).await?;
} else {
return Err(Error::new(ErrorKind::Unexpected, "write has been closed"));
}
Ok(())
})
}

fn close<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<Vec<DataFile>>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move {
let writer = self.inner_writer.take().unwrap();
Ok(writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(DataContentType::PositionDeletes);
res.partition(self.partition_value.clone());
res.build().expect("Guaranteed to be valid")
})
.collect())
})
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use arrow_array::{Int64Array, StringArray};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use crate::io::FileIOBuilder;
use crate::spec::{DataContentType, DataFileFormat, Struct};
use crate::writer::base_writer::position_delete_file_writer::{
PositionDeleteInput, PositionDeleteWriterBuilder, PositionDeleteWriterConfig,
POSITION_DELETE_SCHEMA,
};
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

#[tokio::test]
async fn test_position_delete_writer() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new("memory").build().unwrap();
let location_gen =
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

let pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(POSITION_DELETE_SCHEMA.clone()),
file_io.clone(),
location_gen,
file_name_gen,
);
let mut position_delete_writer = PositionDeleteWriterBuilder::new(pw)
.build(PositionDeleteWriterConfig::new(None))
.await?;

// Write some position delete inputs
let inputs: Vec<PositionDeleteInput> = vec![
PositionDeleteInput {
path: "file2.parquet",
offset: 2,
},
PositionDeleteInput {
path: "file2.parquet",
offset: 1,
},
PositionDeleteInput {
path: "file2.parquet",
offset: 3,
},
PositionDeleteInput {
path: "file3.parquet",
offset: 2,
},
PositionDeleteInput {
path: "file1.parquet",
offset: 5,
},
PositionDeleteInput {
path: "file1.parquet",
offset: 4,
},
PositionDeleteInput {
path: "file1.parquet",
offset: 1,
},
];
position_delete_writer.write(inputs.clone()).await?;

let data_files = position_delete_writer.close().await.unwrap();
assert_eq!(data_files.len(), 1);
assert_eq!(data_files[0].file_format, DataFileFormat::Parquet);
assert_eq!(data_files[0].content, DataContentType::PositionDeletes);
assert_eq!(data_files[0].partition, Struct::empty());

let parquet_file = file_io
.new_input(&data_files[0].file_path)?
.read()
.await
.unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap();
let reader = builder.build().unwrap();
let batches = reader.map(|x| x.unwrap()).collect::<Vec<_>>();

let path_column = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let offset_column = batches[0]
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();

for (i, input) in inputs.iter().enumerate() {
assert_eq!(path_column.value(i), input.path);
assert_eq!(offset_column.value(i), input.offset);
}

Ok(())
}
}

0 comments on commit 803cd39

Please sign in to comment.