Skip to content

Commit

Permalink
Create ArrayScalarBuilder for creating single element List arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 2, 2024
1 parent 86740bf commit 326101d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 38 deletions.
41 changes: 22 additions & 19 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use crate::error::{DataFusionError, Result, _exec_err, _internal_err, _not_impl_
use crate::hash_utils::create_hashes;
use crate::utils::{
array_into_fixed_size_list_array_with_field_name, array_into_large_list_array,
array_into_large_list_array_with_field_name, array_into_list_array,
array_into_list_array_with_field_name,
array_into_large_list_array_with_field_name, array_into_list_array_with_field_name,
SingleRowArrayBuilder,
};
use arrow::compute::kernels::numeric::*;
use arrow::util::display::{array_value_to_string, ArrayFormatter, FormatOptions};
Expand Down Expand Up @@ -2092,7 +2092,11 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
Arc::new(array_into_list_array(values, nullable))
Arc::new(
SingleRowArrayBuilder::new(values)
.with_nullable(nullable)
.build_list_array(),
)
}

/// Same as [`ScalarValue::new_list`] but with nullable set to true.
Expand Down Expand Up @@ -2148,7 +2152,11 @@ impl ScalarValue {
} else {
Self::iter_to_array(values).unwrap()
};
Arc::new(array_into_list_array(values, nullable))
Arc::new(
SingleRowArrayBuilder::new(values)
.with_nullable(nullable)
.build_list_array(),
)
}

/// Converts `Vec<ScalarValue>` where each element has type corresponding to
Expand Down Expand Up @@ -3895,7 +3903,6 @@ mod tests {
};

use crate::assert_batches_eq;
use crate::utils::array_into_list_array_nullable;
use arrow::buffer::OffsetBuffer;
use arrow::compute::{is_null, kernels};
use arrow::error::ArrowError;
Expand Down Expand Up @@ -4071,14 +4078,14 @@ mod tests {

let result = ScalarValue::new_list_nullable(scalars.as_slice(), &DataType::Utf8);

let expected = array_into_list_array_nullable(Arc::new(StringArray::from(vec![
"rust",
"arrow",
"data-fusion",
])));
let expected = single_row_list_array(vec!["rust", "arrow", "data-fusion"]);
assert_eq!(*result, expected);
}

fn single_row_list_array(items: Vec<&str>) -> ListArray {
SingleRowArrayBuilder::new(Arc::new(StringArray::from(items))).build_list_array()
}

fn build_list<O: OffsetSizeTrait>(
values: Vec<Option<Vec<Option<i64>>>>,
) -> Vec<ScalarValue> {
Expand Down Expand Up @@ -4283,12 +4290,8 @@ mod tests {

#[test]
fn iter_to_array_string_test() {
let arr1 = array_into_list_array_nullable(Arc::new(StringArray::from(vec![
"foo", "bar", "baz",
])));
let arr2 = array_into_list_array_nullable(Arc::new(StringArray::from(vec![
"rust", "world",
])));
let arr1 = single_row_list_array(vec!["foo", "bar", "baz"]);
let arr2 = single_row_list_array(vec!["rust", "world"]);

let scalars = vec![
ScalarValue::List(Arc::new(arr1)),
Expand Down Expand Up @@ -5745,13 +5748,13 @@ mod tests {
// Define list-of-structs scalars

let nl0_array = ScalarValue::iter_to_array(vec![s0, s1.clone()]).unwrap();
let nl0 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl0_array)));
let nl0 = SingleRowArrayBuilder::new(nl0_array).build_list_scalar();

let nl1_array = ScalarValue::iter_to_array(vec![s2]).unwrap();
let nl1 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl1_array)));
let nl1 = SingleRowArrayBuilder::new(nl1_array).build_list_scalar();

let nl2_array = ScalarValue::iter_to_array(vec![s1]).unwrap();
let nl2 = ScalarValue::List(Arc::new(array_into_list_array_nullable(nl2_array)));
let nl2 = SingleRowArrayBuilder::new(nl2_array).build_list_scalar();

// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
Expand Down
67 changes: 59 additions & 8 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,23 +321,74 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
count
}

/// Creates single element [`ListArray`], [`LargeListArray`] and
/// [`FixedListArray`] from other arrays
///
/// For example this builder can convert `[1, 2, 3]` into `[[1, 2, 3]]`
///
/// # Example
/// ```
/// todo!()
/// ```
#[derive(Debug, Clone)]
pub struct SingleRowArrayBuilder {
/// array to be wrapped
arr: ArrayRef,
/// Should the resulting array be nullable (allow null entries)? Defaults to
/// `true`.
nullable: bool,
}

impl SingleRowArrayBuilder {
/// Create a new instance of `SingleRowArrayBuilder`
pub fn new(arr: ArrayRef) -> Self {
Self {
arr,
nullable: true,
}
}

/// Set the nullable flag
pub fn with_nullable(mut self, nullable: bool) -> Self {
self.nullable = nullable;
self
}

/// Build a single element `ListArray`
pub fn build_list_array(self) -> ListArray {
let Self { arr, nullable } = self;
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new_list_field(arr.data_type().to_owned(), nullable)),
offsets,
arr,
None,
)
}

/// Build a single element `ListArray` and wrap as `ScalarValue::List`
pub fn build_list_scalar(self) -> ScalarValue {
ScalarValue::List(Arc::new(self.build_list_array()))
}
}

/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
/// The field in the list array is nullable.
#[deprecated(since = "44.0.0", note = "please use `SingleRowArrayBuilder` instead")]
pub fn array_into_list_array_nullable(arr: ArrayRef) -> ListArray {
array_into_list_array(arr, true)
SingleRowArrayBuilder::new(arr)
.with_nullable(true)
.build_list_array()
}

/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
#[deprecated(since = "44.0.0", note = "please use `SingleRowArrayBuilder` instead")]
pub fn array_into_list_array(arr: ArrayRef, nullable: bool) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new_list_field(arr.data_type().to_owned(), nullable)),
offsets,
arr,
None,
)
SingleRowArrayBuilder::new(arr)
.with_nullable(nullable)
.build_list_array()
}

pub fn array_into_list_array_with_field_name(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
use arrow::array::{ArrayRef, OffsetSizeTrait};
use datafusion_common::cast::as_list_array;
use datafusion_common::utils::array_into_list_array_nullable;
use datafusion_common::utils::SingleRowArrayBuilder;
use datafusion_common::ScalarValue;
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType};
use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet;
use std::fmt::Debug;
use std::mem::size_of_val;
use std::sync::Arc;

/// Specialized implementation of
/// `COUNT DISTINCT` for [`StringArray`] [`LargeStringArray`],
Expand All @@ -49,8 +48,7 @@ impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let set = self.0.take();
let arr = set.into_state();
let list = Arc::new(array_into_list_array_nullable(arr));
Ok(vec![ScalarValue::List(list)])
Ok(vec![SingleRowArrayBuilder::new(arr).build_list_scalar()])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
Expand Down Expand Up @@ -109,8 +107,7 @@ impl Accumulator for BytesViewDistinctCountAccumulator {
fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let set = self.0.take();
let arr = set.into_state();
let list = Arc::new(array_into_list_array_nullable(arr));
Ok(vec![ScalarValue::List(list)])
Ok(vec![SingleRowArrayBuilder::new(arr).build_list_scalar()])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use arrow::array::PrimitiveArray;
use arrow::datatypes::DataType;

use datafusion_common::cast::{as_list_array, as_primitive_array};
use datafusion_common::utils::array_into_list_array_nullable;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::utils::SingleRowArrayBuilder;
use datafusion_common::ScalarValue;
use datafusion_expr_common::accumulator::Accumulator;

Expand Down Expand Up @@ -73,8 +73,7 @@ where
PrimitiveArray::<T>::from_iter_values(self.values.iter().cloned())
.with_data_type(self.data_type.clone()),
);
let list = Arc::new(array_into_list_array_nullable(arr));
Ok(vec![ScalarValue::List(list)])
Ok(vec![SingleRowArrayBuilder::new(arr).build_list_scalar()])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
Expand Down Expand Up @@ -160,8 +159,7 @@ where
let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
self.values.iter().map(|v| v.0),
)) as ArrayRef;
let list = Arc::new(array_into_list_array_nullable(arr));
Ok(vec![ScalarValue::List(list)])
Ok(vec![SingleRowArrayBuilder::new(arr).build_list_scalar()])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
Expand Down

0 comments on commit 326101d

Please sign in to comment.