Skip to content

Commit

Permalink
update API
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 2, 2024
1 parent 326101d commit be70e93
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 25 deletions.
9 changes: 3 additions & 6 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::datatypes::DataType;

use arrow_schema::{Field, Fields};
use datafusion_common::cast::as_list_array;
use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx};
use datafusion_common::utils::{get_row_at_idx, SingleRowArrayBuilder};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{internal_err, Result};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_GENERAL;
Expand Down Expand Up @@ -238,9 +238,8 @@ impl Accumulator for ArrayAggAccumulator {
}

let concated_array = arrow::compute::concat(&element_arrays)?;
let list_array = array_into_list_array_nullable(concated_array);

Ok(ScalarValue::List(Arc::new(list_array)))
Ok(SingleRowArrayBuilder::new(concated_array).build_list_scalar())
}

fn size(&self) -> usize {
Expand Down Expand Up @@ -530,9 +529,7 @@ impl OrderSensitiveArrayAggAccumulator {

let ordering_array =
StructArray::try_new(struct_field, column_wise_ordering_values, None)?;
Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable(
Arc::new(ordering_array),
))))
Ok(SingleRowArrayBuilder::new(Arc::new(ordering_array)).build_list_scalar())
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/functions-aggregate/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::{Arc, OnceLock};
use arrow::array::{new_empty_array, ArrayRef, AsArray, StructArray};
use arrow_schema::{DataType, Field, Fields};

use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx};
use datafusion_common::utils::{get_row_at_idx, SingleRowArrayBuilder};
use datafusion_common::{exec_err, internal_err, not_impl_err, Result, ScalarValue};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_STATISTICAL;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
Expand Down Expand Up @@ -423,9 +423,7 @@ impl NthValueAccumulator {
let ordering_array =
StructArray::try_new(struct_field, column_wise_ordering_values, None)?;

Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable(
Arc::new(ordering_array),
))))
Ok(SingleRowArrayBuilder::new(Arc::new(ordering_array)).build_list_scalar())
}

fn evaluate_values(&self) -> ScalarValue {
Expand Down
7 changes: 5 additions & 2 deletions datafusion/functions-nested/src/make_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use arrow_array::{
use arrow_buffer::OffsetBuffer;
use arrow_schema::DataType::{List, Null};
use arrow_schema::{DataType, Field};
use datafusion_common::{plan_err, utils::array_into_list_array_nullable, Result};
use datafusion_common::utils::SingleRowArrayBuilder;
use datafusion_common::{plan_err, Result};
use datafusion_expr::binary::{
try_type_union_resolution_with_struct, type_union_resolution,
};
Expand Down Expand Up @@ -194,7 +195,9 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result<ArrayRef> {
let length = arrays.iter().map(|a| a.len()).sum();
// By default Int64
let array = new_null_array(&DataType::Int64, length);
Ok(Arc::new(array_into_list_array_nullable(array)))
Ok(Arc::new(
SingleRowArrayBuilder::new(array).build_list_array(),
))
}
_ => array_array::<i32>(arrays, data_type),
}
Expand Down
27 changes: 14 additions & 13 deletions datafusion/functions-nested/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,27 +274,27 @@ pub(crate) fn get_map_entry_field(data_type: &DataType) -> Result<&Fields> {
mod tests {
use super::*;
use arrow::datatypes::Int64Type;
use datafusion_common::utils::array_into_list_array_nullable;
use datafusion_common::utils::SingleRowArrayBuilder;

/// Only test internal functions, array-related sql functions will be tested in sqllogictest `array.slt`
#[test]
fn test_align_array_dimensions() {
let array1d_1 =
let array1d_1: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
Some(vec![Some(4), Some(5)]),
]));
let array1d_2 =
let array1d_2: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(6), Some(7), Some(8)]),
]));

let array2d_1 = Arc::new(array_into_list_array_nullable(
Arc::clone(&array1d_1) as ArrayRef
)) as ArrayRef;
let array2d_2 = Arc::new(array_into_list_array_nullable(
Arc::clone(&array1d_2) as ArrayRef
)) as ArrayRef;
let array2d_1: ArrayRef = Arc::new(
SingleRowArrayBuilder::new(Arc::clone(&array1d_1)).build_list_array(),
);
let array2d_2 = Arc::new(
SingleRowArrayBuilder::new(Arc::clone(&array1d_2)).build_list_array(),
);

let res = align_array_dimensions::<i32>(vec![
array1d_1.to_owned(),
Expand All @@ -310,10 +310,11 @@ mod tests {
expected_dim
);

let array3d_1 = Arc::new(array_into_list_array_nullable(array2d_1)) as ArrayRef;
let array3d_2 = array_into_list_array_nullable(array2d_2.to_owned());
let res =
align_array_dimensions::<i32>(vec![array1d_1, Arc::new(array3d_2)]).unwrap();
let array3d_1: ArrayRef =
Arc::new(SingleRowArrayBuilder::new(array2d_1).build_list_array());
let array3d_2: ArrayRef =
Arc::new(SingleRowArrayBuilder::new(array2d_2).build_list_array());
let res = align_array_dimensions::<i32>(vec![array1d_1, array3d_2]).unwrap();

let expected = as_list_array(&array3d_1).unwrap();
let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type());
Expand Down

0 comments on commit be70e93

Please sign in to comment.