-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement GroupsAccumulator for corr(x,y) aggregate function #13581
base: main
Are you sure you want to change the base?
Conversation
This looks amazing -- thank you @2010YOUY01 I plan to review it over the next day or two It seems like maybe we should add the data generator for h2o benchmark to the bench.sh script 🤔 |
let nulls = arr | ||
.nulls() | ||
.expect("If null_count() > 0, nulls must be present"); | ||
match combined_nulls { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If passing combined_nulls
to NullBuffer::union
it will take care of handling Option
T: ArrowPrimitiveType + Send, | ||
F: FnMut(usize, &[T::Native]) + Send, | ||
{ | ||
let acc_cols: Vec<&[T::Native]> = value_columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think collecting into Vec
might not be necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true, I have updated
0d6e2c7
to
380ef0a
Compare
for (idx, &group_idx) in group_indices.iter().enumerate() { | ||
// Get `idx`-th row from all value(accumulate) columns | ||
let row_values: Vec<_> = | ||
value_columns.iter().map(|col| col.value(idx)).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to avoid collecting
here? Can we take an iterator instead in value_fn
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bench query above went from 4s -> 3s after this change! Great catch
Though I have to structure the code differently to make it compile, it's a bit more complex so I added more comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
self.sum_xx.resize(total_num_groups, 0.0); | ||
self.sum_yy.resize(total_num_groups, 0.0); | ||
|
||
let array_x = &cast(&values[0], &DataType::Float64)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think casting should be handled in logical optimizer. Fixing the signature of Correlation
might helps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, this line is redundant
Addressed in 98cba91
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to remove it, all tests passed but the benchmark query above won't run 🤔 The existing signature looks correct to me, this might need further investigation
Issued #13721
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs
Outdated
Show resolved
Hide resolved
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs
Outdated
Show resolved
Hide resolved
Hi, I think this one is pretty close, do you have time to look at the review comments @2010YOUY01 ? |
Yes, I will be back and finish this PR in next 2 days, I'm traveling and afk this week. Thanks for the attention to this ticket |
I also harbor hopes of contributing a benchmark for (to be clear not for this PR) |
Thank you all for the review, it's ready for another look |
Very nice work @2010YOUY01 |
Which issue does this PR close?
Closes #13549
Rationale for this change
Implement
GroupsAccumulator
forcorr
aggregation function, for better performance when group cardinality is highI rerun the H2o benchmark:
Data Generation
falsa groupby --path-prefix=/Users/yongting/data/ --size MEDIUM --data-format PARQUET
https://github.com/mrpowers-io/falsa
Run benchmark in datafusion-cli
Result
Main: 12s
This PR: 4s
(On my MacBook with m4 pro)
Remaining tasks
ImplementThis requires changes in aggregate fuzzer for test coverage, which can be done later to keep this PR smallconvert_to_states()
What changes are included in this PR?
accumulate_multiple
andaccumulate_correlation_states
to accumulate states in correlation function. (existing util functions is for aggregate functions with 1 input expravg(expr1) v.s. corr(expr1, expr2)
)GroupsAccumulator
forcorr()
Are these changes tested?
Unit tests for util functions
corr()
is covered by existing testsAre there any user-facing changes?
No