Skip to content

Commit

Permalink
Implement RightSemi join for SortMergeJoin
Browse files Browse the repository at this point in the history
  • Loading branch information
athultr1997 committed Nov 27, 2024
1 parent 4e5e765 commit 493804f
Show file tree
Hide file tree
Showing 3 changed files with 695 additions and 208 deletions.
31 changes: 29 additions & 2 deletions datafusion/core/tests/fuzz_cases/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use arrow::compute::SortOptions;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::Schema;
use log::debug;
use std::sync::Arc;
use std::time::SystemTime;

Expand Down Expand Up @@ -186,7 +187,7 @@ async fn test_full_join_1k_filtered() {
}

#[tokio::test]
async fn test_semi_join_1k() {
async fn test_left_semi_join_1k() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
Expand All @@ -198,7 +199,7 @@ async fn test_semi_join_1k() {
}

#[tokio::test]
async fn test_semi_join_1k_filtered() {
async fn test_left_semi_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
Expand All @@ -209,6 +210,30 @@ async fn test_semi_join_1k_filtered() {
.await
}

#[tokio::test]
async fn test_right_semi_join_1k() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::RightSemi,
None,
)
.run_test(&[HjSmj, NljHj], true)
.await
}

#[tokio::test]
async fn test_right_semi_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::RightSemi,
Some(Box::new(col_lt_col_filter)),
)
.run_test(&[HjSmj, NljHj], true)
.await
}

#[tokio::test]
async fn test_anti_join_1k() {
JoinFuzzTestCase::new(
Expand Down Expand Up @@ -478,9 +503,11 @@ impl JoinFuzzTestCase {
let task_ctx = ctx.task_ctx();

let hj = self.hash_join();
debug!("hj = {:?}", hj);
let hj_collected = collect(hj, task_ctx.clone()).await.unwrap();

let smj = self.sort_merge_join();
debug!("smj = {:#?}", smj);
let smj_collected = collect(smj, task_ctx.clone()).await.unwrap();

let nlj = self.nested_loop_join();
Expand Down
Loading

0 comments on commit 493804f

Please sign in to comment.