Skip to content

Commit

Permalink
Added documentation for SortMergeJoin
Browse files Browse the repository at this point in the history
  • Loading branch information
athultr1997 committed Nov 18, 2024
1 parent 6b0570b commit c6554ec
Showing 1 changed file with 42 additions and 4 deletions.
46 changes: 42 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,43 @@ use crate::{
RecordBatchStream, SendableRecordBatchStream, Statistics,
};

/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
/// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge
/// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large
/// inputs where one or both of the inputs don't fit in the available memory.
///
/// # Join Expressions
///
/// Equi-join predicate (e.g. `<col1> = <col2>`) expressions are represented by [`Self::on`].
///
/// Non-equality predicates, which can not be pushed down to join inputs (e.g.
/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
/// after the equijoin predicates. They are represented by [`Self::filter`]. These are optional
/// expressions.
///
/// # Sorting
///
/// Assumes that both the left and right input to the join are pre-sorted. It is not the
/// responisibility of this execution plan to sort the inputs.
///
/// # "Streamed" vs "Buffered"
///
/// Buffered input is buffered for all record batches having the same value of join key.
/// If the memory limit increases beyond the specified value and spilling is enabled,
/// buffered batches could be spilled to disk. If spilling is disabled, the execution
/// will fail under the same conditions. Multiple record batches of buffered could be
/// present in memory/disk during the exectution.
///
/// Only one record batch of streamed input will be present in the memory at all times. There is no
/// spilling support for streamed input. The comparisons are performed from values of join keys in
/// streamed input with the values of join keys in buffered input. One row in streamed record
/// batch could be matched with multiple rows in buffered input batches.
///
/// Depending on the type of join left or right input may be selected as streamed or buffered
/// respectively. For example, in a left-outer join, the left execution plan will be selected as
/// streamed input.
///
/// Reference for the algorithm:
/// <https://en.wikipedia.org/wiki/Sort-merge_join>
#[derive(Debug, Clone)]
pub struct SortMergeJoinExec {
/// Left sorted joining execution plan
Expand Down Expand Up @@ -529,6 +564,9 @@ struct StreamedJoinedChunk {
buffered_indices: UInt64Builder,
}

/// Represents a record batch from streamed input.
///
/// Also stores information of matching rows from buffered batches.
struct StreamedBatch {
/// The streamed record batch
pub batch: RecordBatch,
Expand Down Expand Up @@ -667,8 +705,8 @@ impl BufferedBatch {
}
}

/// Sort-merge join stream that consumes streamed and buffered data stream
/// and produces joined output
/// Sort-Merge join stream that consumes streamed and buffered data streams
/// and produces joined output stream.
struct SortMergeJoinStream {
/// Current state of the stream
pub state: SortMergeJoinState,
Expand Down

0 comments on commit c6554ec

Please sign in to comment.