-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sort by dw_last_updated before materializing #334
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
import datetime as dt | ||
import pyarrow as pa | ||
from deltacat.constants import DW_LAST_UPDATED_COLUMN_NAME | ||
from deltacat.tests.compute.test_util_common import ( | ||
PartitionKey, | ||
PartitionKeyType, | ||
|
@@ -65,20 +67,68 @@ class RebaseCompactionTestCaseParams(BaseCompactorTestCase): | |
], | ||
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"], | ||
), | ||
expected_terminal_compact_partition_result=pa.Table.from_arrays( | ||
expected_terminal_compact_partition_result=pa.Table.from_arrays([]), | ||
expected_terminal_exception=None, | ||
expected_terminal_exception_message=None, | ||
do_create_placement_group=False, | ||
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE, | ||
hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT, | ||
read_kwargs_provider=None, | ||
drop_duplicates=True, | ||
skip_enabled_compact_partition_drivers=[CompactorVersion.V1], | ||
), | ||
"2-rebase-sort": RebaseCompactionTestCaseParams( | ||
primary_keys={"pk_col_1"}, | ||
sort_keys=[ | ||
SortKey.of(key_name="sk_col_1"), | ||
SortKey.of(key_name="sk_col_2"), | ||
], | ||
partition_keys=[PartitionKey.of("region_id", PartitionKeyType.INT)], | ||
partition_values=["1"], | ||
input_deltas=pa.Table.from_arrays( | ||
[ | ||
pa.array([str(i) for i in range(10)]), | ||
pa.array([i for i in range(20, 30)]), | ||
pa.array([i for i in range(0, 10)]), | ||
pa.array(["foo"] * 10), | ||
pa.array([i / 10 for i in range(40, 50)]), | ||
pa.array([i / 10 for i in range(10, 20)]), | ||
pa.array(dt.datetime(year, 1, 1) for year in range(2000, 2010)), | ||
], | ||
names=[ | ||
"pk_col_1", | ||
"sk_col_1", | ||
"sk_col_2", | ||
"col_1", | ||
DW_LAST_UPDATED_COLUMN_NAME, | ||
], | ||
), | ||
input_deltas_delta_type=DeltaType.UPSERT, | ||
# dw_last_update is in ascending order in the input table. | ||
# Expect descending sort on dw_last_updated for each hash bucket. | ||
# Since there is only one hash bucket, the order of input rows should be reversed. | ||
rebase_expected_compact_partition_result=pa.Table.from_arrays( | ||
[ | ||
pa.array([str(i) for i in reversed(range(10))]), | ||
pa.array([i for i in reversed(range(0, 10))]), | ||
pa.array(["foo"] * 10), | ||
pa.array([i / 10 for i in reversed(range(10, 20))]), | ||
pa.array( | ||
dt.datetime(year, 1, 1) for year in reversed(range(2000, 2010)) | ||
), | ||
], | ||
names=[ | ||
"pk_col_1", | ||
"sk_col_1", | ||
"sk_col_2", | ||
"col_1", | ||
DW_LAST_UPDATED_COLUMN_NAME, | ||
], | ||
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"], | ||
), | ||
expected_terminal_compact_partition_result=pa.Table.from_arrays([]), | ||
expected_terminal_exception=None, | ||
expected_terminal_exception_message=None, | ||
do_create_placement_group=False, | ||
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. max records per file is 4M but your test case doesn't have more than 4M records. So, it only tests sorting within a single file but not across multiple files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you recommend reducing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (At least, for the new test case?) |
||
hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT, | ||
hash_bucket_count=1, | ||
read_kwargs_provider=None, | ||
drop_duplicates=True, | ||
skip_enabled_compact_partition_drivers=[CompactorVersion.V1], | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import boto3 | ||
from boto3.resources.base import ServiceResource | ||
import pyarrow as pa | ||
from deltacat.constants import DW_LAST_UPDATED_COLUMN_NAME | ||
from deltacat.io.ray_plasma_object_store import RayPlasmaObjectStore | ||
from pytest_benchmark.fixture import BenchmarkFixture | ||
|
||
|
@@ -274,16 +275,24 @@ def test_compact_partition_rebase_same_source_and_destination( | |
compacted_delta_locator, storage_type=StorageType.LOCAL, **ds_mock_kwargs | ||
) | ||
actual_rebase_compacted_table = pa.concat_tables(tables) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can add one assertion if the result should yield multiple tables. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, the suggestion is to add an assertion on |
||
# if no primary key is specified then sort by sort_key for consistent assertion | ||
sorting_cols: List[Any] = ( | ||
[(val, "ascending") for val in primary_keys] if primary_keys else sort_keys | ||
) | ||
rebase_expected_compact_partition_result = ( | ||
rebase_expected_compact_partition_result.combine_chunks().sort_by(sorting_cols) | ||
) | ||
actual_rebase_compacted_table = ( | ||
actual_rebase_compacted_table.combine_chunks().sort_by(sorting_cols) | ||
) | ||
if DW_LAST_UPDATED_COLUMN_NAME in actual_rebase_compacted_table.column_names: | ||
# If DW_LAST_UPDATED_COLUMN_NAME is present, don't sort expected and actual tables; | ||
# we want to assert on the order of the rows in the table, to validate sorting on timestamp. | ||
pass | ||
else: | ||
# If DW_LAST_UPDATED_COLUMN_NAME is absent, sort by primary key for consistent assertion. | ||
# Sort by sort_key if no primary key is specified. | ||
sorting_cols: List[Any] = ( | ||
[(val, "ascending") for val in primary_keys] if primary_keys else sort_keys | ||
) | ||
rebase_expected_compact_partition_result = ( | ||
rebase_expected_compact_partition_result.combine_chunks().sort_by( | ||
sorting_cols | ||
) | ||
) | ||
actual_rebase_compacted_table = ( | ||
actual_rebase_compacted_table.combine_chunks().sort_by(sorting_cols) | ||
) | ||
assert actual_rebase_compacted_table.equals( | ||
rebase_expected_compact_partition_result | ||
), f"{actual_rebase_compacted_table} does not match {rebase_expected_compact_partition_result}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming the previously compacted table will already be sorted, we can further optimize by only sorting the incremental table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand. Would you mind linking a code reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad, used my eyeballs.
Yes, I see. For the purposes of a POC, perhaps you can do that in a separate PR. Would indeed be good to avoid wasting time sorting a sorted table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now realize we can use sort_keys argument to sort the incremental instead of hardcoding here. This was okay for POC but we want to limit this hard coding to our internal packages.