Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GroupsAccumulator for corr(x,y) aggregate function #13581

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Nov 27, 2024

Which issue does this PR close?

Closes #13549

Rationale for this change

Implement GroupsAccumulator for corr aggregation function, for better performance when group cardinality is high

I rerun the H2o benchmark:

Data Generation

falsa groupby --path-prefix=/Users/yongting/data/ --size MEDIUM --data-format PARQUET
https://github.com/mrpowers-io/falsa

Run benchmark in datafusion-cli

CREATE EXTERNAL TABLE IF NOT EXISTS h2o_100m (
    id1 VARCHAR NOT NULL,
    id2 VARCHAR NOT NULL,
    id3 VARCHAR NOT NULL,
    id4 INTEGER NOT NULL,
    id5 INTEGER NOT NULL,
    id6 INTEGER NOT NULL,
    v1 INTEGER NOT NULL,
    v2 INTEGER NOT NULL,
    v3 DOUBLE PRECISION NOT NULL
)
STORED AS parquet
LOCATION '/Users/yongting/data/G1_1e8_1e8_100_0.parquet';

select id2, id4, power(corr(v1, v2), 2) as r2 from h2o_100m group by id2, id4;

Result

Main: 12s
This PR: 4s
(On my MacBook with m4 pro)


Remaining tasks
Implement convert_to_states() This requires changes in aggregate fuzzer for test coverage, which can be done later to keep this PR small

What changes are included in this PR?

  1. Implement two utility functions: accumulate_multiple and accumulate_correlation_states to accumulate states in correlation function. (existing util functions is for aggregate functions with 1 input expr avg(expr1) v.s. corr(expr1, expr2))
  2. Implement GroupsAccumulator for corr()

Are these changes tested?

Unit tests for util functions
corr() is covered by existing tests

Are there any user-facing changes?

No

@alamb
Copy link
Contributor

alamb commented Nov 27, 2024

This looks amazing -- thank you @2010YOUY01

I plan to review it over the next day or two

It seems like maybe we should add the data generator for h2o benchmark to the bench.sh script 🤔

let nulls = arr
.nulls()
.expect("If null_count() > 0, nulls must be present");
match combined_nulls {
Copy link
Contributor

@Dandandan Dandandan Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If passing combined_nulls to NullBuffer::union it will take care of handling Option

T: ArrowPrimitiveType + Send,
F: FnMut(usize, &[T::Native]) + Send,
{
let acc_cols: Vec<&[T::Native]> = value_columns
Copy link
Contributor

@Dandandan Dandandan Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think collecting into Vec might not be necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, I have updated

for (idx, &group_idx) in group_indices.iter().enumerate() {
// Get `idx`-th row from all value(accumulate) columns
let row_values: Vec<_> =
value_columns.iter().map(|col| col.value(idx)).collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to avoid collecting here? Can we take an iterator instead in value_fn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bench query above went from 4s -> 3s after this change! Great catch
Though I have to structure the code differently to make it compile, it's a bit more complex so I added more comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

self.sum_xx.resize(total_num_groups, 0.0);
self.sum_yy.resize(total_num_groups, 0.0);

let array_x = &cast(&values[0], &DataType::Float64)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think casting should be handled in logical optimizer. Fixing the signature of Correlation might helps

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this line is redundant
Addressed in 98cba91

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to remove it, all tests passed but the benchmark query above won't run 🤔 The existing signature looks correct to me, this might need further investigation
Issued #13721

@Dandandan
Copy link
Contributor

Hi, I think this one is pretty close, do you have time to look at the review comments @2010YOUY01 ?

@2010YOUY01
Copy link
Contributor Author

Hi, I think this one is pretty close, do you have time to look at the review comments @2010YOUY01 ?

Yes, I will be back and finish this PR in next 2 days, I'm traveling and afk this week. Thanks for the attention to this ticket

@alamb
Copy link
Contributor

alamb commented Dec 7, 2024

I also harbor hopes of contributing a benchmark for corr, hopefully

(to be clear not for this PR)

@2010YOUY01
Copy link
Contributor Author

Thank you all for the review, it's ready for another look

@Dandandan
Copy link
Contributor

Very nice work @2010YOUY01

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve performance of corr function
4 participants