-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support arrow_struct_to_iceberg_struct
#731
base: main
Are you sure you want to change the base?
Conversation
1ffc802
to
0830aae
Compare
Hi, @ZENOTME Thanks for this pr! I'm thinking that instead of array transformation, should we consider transforming arrow record batch to/from array of iceberg datum? It maybe also worthy to have a visitor pattern. |
0830aae
to
cc09bff
Compare
For now, this function is mainly used in partition writer. And we store partition value as Struct so we need to transform it to literal now.
Good point. The benefit of visitor patterns is to make it more convenient to convert between different types, e.g. datum or literal. I try to use visitor patterns for this PR so that we can add more type conversions in the future. But I'm not sure whether it's a good design. Feel free to let me know if there are some other API designs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME for this pr, I have some suggestions about it.
crates/iceberg/src/arrow/value.rs
Outdated
&self, | ||
array: &NullArray, | ||
arrow_type: &DataType, | ||
iceberg_type: &PrimitiveType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitating to add an iceberg_type
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't think we need an arrow_type
, arrow array has data type associated with it: https://docs.rs/arrow/latest/arrow/array/trait.Array.html#tymethod.data_type
crates/iceberg/src/arrow/value.rs
Outdated
use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; | ||
use crate::{Error, ErrorKind, Result}; | ||
|
||
trait ArrowArrayVistor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need have clear documentation about what kind of visitor it is. Is it an post order visitor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Iceberg, everything is post-order, except assigning field-IDs. I think in this case post-order makes sense, since you would first convert the fields of the struct, and then move up to nested-structs, lists, maps, structs, all the way up to the schema struct.
arrow_struct_to_iceberg_struct
crates/iceberg/src/arrow/value.rs
Outdated
|
||
let mut columns = Vec::with_capacity(array.columns().len()); | ||
|
||
for ((array, arrow_type), iceberg_field) in array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looping back @liurenjie1024 question here:
We need have clear documentation about what kind of visitor it is.
First of all, thanks for converting this into a visitor, I think that's the right approach here. One question regarding the lookup. It looks like we're expecting the Arrow data to be in the same position of the Iceberg schema. Instead, what would be more Iceberg-esque, is performing the lookup by field-ID.
For example:
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
table {
1: city: optional string
2: lat: optional double
3: long: optional double
}
table = catalog.create_table("default.locations", schema)
with table.update_schema() as update:
update.move_after("lat", "long")
table {
1: city: optional string
3: lat: optional double
2: long: optional double
}
Now the fields are switched. Data that's already written needs to be projected, and new data can be read as is. Both of the files will be read by this visitor without raising an error, but for the old files, the data the fields will be switched.
This way, we could also do smart things, like fill in nullable fields:
let struct_array = StructArray::new_null();
let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional(
0,
"bool_field",
Type::Primitive(PrimitiveType::Boolean),
))]);
let result = arrow_struct_to_literal(&struct_array, iceberg_struct_type).unwrap();
assert_eq!(result, vec![None; 3]);
And in a step after that, #737. You can find the Python equivalent here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use field-ID require user store the file id metadata for date type in struct type. I'm not sure whether is this always set up. How about providing two visit function? One would assume the schema of arrow will be same as iceberg schema, it will return error if convert fail. And the other one will look up the column using the field id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the other one will look up the column using the field id.
+1. I think we should allow user to choose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME 's effort. I saw that both java/python have schema with partner visitor:
- SchemaWithPartnerVisitor
- https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726
I'm thinking that we should adopt similar design?
crates/iceberg/src/arrow/value.rs
Outdated
fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>; | ||
fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>; | ||
fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>; | ||
fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>; | ||
fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result<Vec<Self::T>>; | ||
fn decimal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider condense these into to one method? Listing all primitive types seems too cumbersome to me.
crates/iceberg/src/arrow/value.rs
Outdated
|
||
/// A post order arrow array visitor. | ||
/// # TODO | ||
/// - Add support for ListArray, MapArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can understand that we don't want to support ListArray
, MapArray
for now, but we should throw unsupport feature in implementation. As an interface, we should still take into account those types.
I tried to adopt the SchemaWithPartner design, but I find the interface is not so general. https://github.com/apache/iceberg-python/blob/bfc0d9a62176803094da0867ee793808f105d352/pyiceberg/io/pyarrow.py#L1726 is not for array convert I think. To make it suitable for arrow array convert, I make some change to it:
I think we can still implement e.g. arrow array project and cast using the changed interface, but I'm not sure whether the change is reasonable. There are some errors in this PR, I will fix them if we reach a consensus on this design. |
|
||
/// Called before every type, if this function return `Some`, the following visiting will be skipped. | ||
/// This function used to implement early return. | ||
fn visit_type_before(&mut self, _ty: &Type, _partner: &P) -> Result<Option<Self::T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should add this part. I think this is an optimization to prune some fields, right? Instead adding this, why we don't prune unnecessary fields before visiting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we don't prune unnecessary fields before visiting?
I think this prune will also incur visits recursively. Let me explain why we need this prune. In arrow, the struct array maybe represent as a nullarray which means that all child field are null. So in this case, we need to recognize this and stop to visit child column. But in our visitor pattern, we can't control this, the control path is in the visit_function rather than vistor. So we need to use this function to notify the visit function that stop visit child column if this function return Some.
results: Vec<Self::T>, | ||
) -> Result<Self::T>; | ||
/// Called after list fields visited. | ||
fn list(&mut self, list: &ListType, partner: &P, value: Vec<Self::T>) -> Result<Self::T>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the value of a Vec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the visit fucnton, this function will be called after visit all child column. So the Vec
means the result return by all child columns, and then this function may combine this result.
&mut self, | ||
map: &MapType, | ||
partner: &P, | ||
key_value: Vec<Self::T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar like struct
, In the visit fucnton, this function will be called after visit all row. So the Vec means the result return by all row, and then this function may combine this result.
Type::List(list) => { | ||
let mut results = Vec::new(); | ||
let mut list_element_partner_iter = accessor.list_element_partner(partner)?; | ||
if let Some(list_element_partner) = list_element_partner_iter.next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem quite in efficient. It should be the actual visitor's reponsibility to do visit partner. If I understand correctly, this is because you have trouble when dealing with list array of arrow? In this case, you can get inner child array by using array_data()[0]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem quite in efficient. It should be the actual visitor's reponsibility to do visit partner.
For now, our visit pattern is that the control logic is in visit function. The visitor only take care of how to access data. If we let the actual visitor's reponsibility to do visit partner, which means that it need to know access which partner, e.g it need to know some index I think. 🤔 And the interface should looks like:
fn before_list_element(&mut self, _field: &NestedFieldRef, _partner: &P, element_idx: usize) -> Result<()> {
Ok(())
}
Is that what you means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME for this pr. I have some concerns about the processing of list/map
This PR introduces the function to convert arrow struct to iceberg struct. This function is needed when we add fanout partition writer: In this writer, we need to compute the partition value using record batch and convert them into struct value finally and set into data file.