Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support arrow_struct_to_iceberg_struct #731

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Nov 28, 2024

This PR introduces the function to convert arrow struct to iceberg struct. This function is needed when we add fanout partition writer: In this writer, we need to compute the partition value using record batch and convert them into struct value finally and set into data file.

@ZENOTME ZENOTME force-pushed the arrow_value_convert branch 2 times, most recently from 1ffc802 to 0830aae Compare November 28, 2024 12:00
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Nov 28, 2024

cc @liurenjie1024 @Fokko @Xuanwo

@liurenjie1024
Copy link
Contributor

Hi, @ZENOTME Thanks for this pr! I'm thinking that instead of array transformation, should we consider transforming arrow record batch to/from array of iceberg datum? It maybe also worthy to have a visitor pattern.

@ZENOTME ZENOTME force-pushed the arrow_value_convert branch from 0830aae to cc09bff Compare December 4, 2024 07:40
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Dec 4, 2024

Hi, @ZENOTME Thanks for this pr! I'm thinking that instead of array transformation, should we consider transforming arrow record batch to/from array of iceberg datum?

For now, this function is mainly used in partition writer. And we store partition value as Struct so we need to transform it to literal now.

It maybe also worthy to have a visitor pattern.

Good point. The benefit of visitor patterns is to make it more convenient to convert between different types, e.g. datum or literal. I try to use visitor patterns for this PR so that we can add more type conversions in the future. But I'm not sure whether it's a good design. Feel free to let me know if there are some other API designs.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME for this pr, I have some suggestions about it.

&self,
array: &NullArray,
arrow_type: &DataType,
iceberg_type: &PrimitiveType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitating to add an iceberg_type here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't think we need an arrow_type, arrow array has data type associated with it: https://docs.rs/arrow/latest/arrow/array/trait.Array.html#tymethod.data_type

use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type};
use crate::{Error, ErrorKind, Result};

trait ArrowArrayVistor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need have clear documentation about what kind of visitor it is. Is it an post order visitor?

Copy link
Contributor

@Fokko Fokko Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Iceberg, everything is post-order, except assigning field-IDs. I think in this case post-order makes sense, since you would first convert the fields of the struct, and then move up to nested-structs, lists, maps, structs, all the way up to the schema struct.

crates/iceberg/src/arrow/value.rs Outdated Show resolved Hide resolved
@Fokko Fokko changed the title feat: support arrow_struct_to_iceberg_struct feat: support arrow_struct_to_iceberg_struct Dec 6, 2024

let mut columns = Vec::with_capacity(array.columns().len());

for ((array, arrow_type), iceberg_field) in array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping back @liurenjie1024 question here:

We need have clear documentation about what kind of visitor it is.

First of all, thanks for converting this into a visitor, I think that's the right approach here. One question regarding the lookup. It looks like we're expecting the Arrow data to be in the same position of the Iceberg schema. Instead, what would be more Iceberg-esque, is performing the lookup by field-ID.

For example:

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

table {
  1: city: optional string
  2: lat: optional double
  3: long: optional double
}

table = catalog.create_table("default.locations", schema)

with table.update_schema() as update:
    update.move_after("lat", "long")


table {
  1: city: optional string
  3: lat: optional double
  2: long: optional double
}

Now the fields are switched. Data that's already written needs to be projected, and new data can be read as is. Both of the files will be read by this visitor without raising an error, but for the old files, the data the fields will be switched.

This way, we could also do smart things, like fill in nullable fields:

        let struct_array = StructArray::new_null();
        let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional(
            0,
            "bool_field",
            Type::Primitive(PrimitiveType::Boolean),
        ))]);
        let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap();
        assert_eq!(result, vec![None; 3]);

And in a step after that, #737. You can find the Python equivalent here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use field-ID require user store the file id metadata for date type in struct type. I'm not sure whether is this always set up. How about providing two visit function? One would assume the schema of arrow will be same as iceberg schema, it will return error if convert fail. And the other one will look up the column using the field id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the other one will look up the column using the field id.

+1. I think we should allow user to choose.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME 's effort. I saw that both java/python have schema with partner visitor:

  1. SchemaWithPartnerVisitor
  2. https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726

I'm thinking that we should adopt similar design?

Comment on lines 37 to 42
fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>;
fn decimal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider condense these into to one method? Listing all primitive types seems too cumbersome to me.


/// A post order arrow array visitor.
/// # TODO
/// - Add support for ListArray, MapArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand that we don't want to support ListArray, MapArray for now, but we should throw unsupport feature in implementation. As an interface, we should still take into account those types.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Dec 16, 2024

Thanks @ZENOTME 's effort. I saw that both java/python have schema with partner visitor:

  1. SchemaWithPartnerVisitor
  2. https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726

I'm thinking that we should adopt similar design?

I tried to adopt the SchemaWithPartner design, but I find the interface is not so general. https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726 is not for array convert I think. To make it suitable for arrow array convert, I make some change to it:

  1. Introduce ListIterator and MapIterator, the control flow of list and map is not so suitable for arrow array convert.
  2. array in arrow rust can be NullArray, so we need some preorder access before diving into the subcolumn. So I introduce visit_type_before to solve it.

I think we can still implement e.g. arrow array project and cast using the changed interface, but I'm not sure whether the change is reasonable. There are some errors in this PR, I will fix them if we reach a consensus on this design.


/// Called before every type, if this function return `Some`, the following visiting will be skipped.
/// This function used to implement early return.
fn visit_type_before(&mut self, _ty: &Type, _partner: &P) -> Result<Option<Self::T>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should add this part. I think this is an optimization to prune some fields, right? Instead adding this, why we don't prune unnecessary fields before visiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we don't prune unnecessary fields before visiting?

I think this prune will also incur visits recursively. Let me explain why we need this prune. In arrow, the struct array maybe represent as a nullarray which means that all child field are null. So in this case, we need to recognize this and stop to visit child column. But in our visitor pattern, we can't control this, the control path is in the visit_function rather than vistor. So we need to use this function to notify the visit function that stop visit child column if this function return Some.

results: Vec<Self::T>,
) -> Result<Self::T>;
/// Called after list fields visited.
fn list(&mut self, list: &ListType, partner: &P, value: Vec<Self::T>) -> Result<Self::T>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the value of a Vec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the visit fucnton, this function will be called after visit all child column. So the Vec means the result return by all child columns, and then this function may combine this result.

&mut self,
map: &MapType,
partner: &P,
key_value: Vec<Self::T>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar like struct, In the visit fucnton, this function will be called after visit all row. So the Vec means the result return by all row, and then this function may combine this result.

Type::List(list) => {
let mut results = Vec::new();
let mut list_element_partner_iter = accessor.list_element_partner(partner)?;
if let Some(list_element_partner) = list_element_partner_iter.next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem quite in efficient. It should be the actual visitor's reponsibility to do visit partner. If I understand correctly, this is because you have trouble when dealing with list array of arrow? In this case, you can get inner child array by using array_data()[0].

Copy link
Contributor Author

@ZENOTME ZENOTME Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem quite in efficient. It should be the actual visitor's reponsibility to do visit partner.

For now, our visit pattern is that the control logic is in visit function. The visitor only take care of how to access data. If we let the actual visitor's reponsibility to do visit partner, which means that it need to know access which partner, e.g it need to know some index I think. 🤔 And the interface should looks like:

fn before_list_element(&mut self, _field: &NestedFieldRef, _partner: &P, element_idx: usize) -> Result<()> {
       Ok(())
   }

Is that what you means?

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME for this pr. I have some concerns about the processing of list/map

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants