Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs: Add docs to RepartitionExec and architecture guide #7003

Merged
merged 2 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,17 @@
//! ```
//!
//! [`ExecutionPlan`]s process data using the [Apache Arrow] memory
//! format, largely with functions from the [arrow] crate. When
//! [`execute`] is called, a [`SendableRecordBatchStream`] is returned
//! that produces the desired output as a [`Stream`] of [`RecordBatch`]es.
//! format, making heavy use of functions from the [arrow]
//! crate. Calling [`execute`] produces 1 or more partitions of data,
//! consisting an operator that implements
//! [`SendableRecordBatchStream`].
//!
//! Values are
//! represented with [`ColumnarValue`], which are either single
//! constant values ([`ScalarValue`]) or Arrow Arrays ([`ArrayRef`]).
//! Values are represented with [`ColumnarValue`], which are either
//! [`ScalarValue`] (single constant values) or [`ArrayRef`] (Arrow
//! Arrays).
//!
//! Balanced parallelism is achieved using [`RepartitionExec`], which
//! implements a [Volcano style] "Exchange".
//!
//! [`execute`]: physical_plan::ExecutionPlan::execute
//! [`SendableRecordBatchStream`]: crate::physical_plan::SendableRecordBatchStream
Expand All @@ -345,9 +349,10 @@
//! [`ArrayRef`]: arrow::array::ArrayRef
//! [`Stream`]: futures::stream::Stream
//!
//!
//! See the [implementors of `ExecutionPlan`] for a list of physical operators available.
//!
//! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
//! [Volcano style]: https://w6113.github.io/files/papers/volcanoparallelism-89.pdf
//! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
//!
//! ## State Management and Configuration
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}

/// Trait for a stream of record batches.
/// Trait for a [`Stream`] of [`RecordBatch`]es
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

/// EmptyRecordBatchStream can be used to create a RecordBatchStream
Expand Down
63 changes: 61 additions & 2 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,67 @@ impl BatchPartitioner {
}
}

/// The repartition operator maps N input partitions to M output partitions based on a
/// partitioning scheme. No guarantees are made about the order of the resulting partitions.
/// Maps `N` input partitions to `M output partitions based on a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2023-07-17 at 12 21 43 PM

alamb marked this conversation as resolved.
Show resolved Hide resolved
/// [`Partitioning`] scheme.
///
/// # Background
///
/// DataFusion, like most other commercial systems, with the the
/// notable exception of DuckDB, uses the "Exchange Operator" based
/// approach to parallelism which works well in practice given
/// sufficient care in implementation.
///
/// DataFusion's planner picks the target number of partitions and
/// then `RepartionExec` redistributes [`RecordBatch`]es to that number
/// of output partitions.
///
/// For example, given `target_partitions=3` (trying to use 3 cores)
/// but scanning an input with 2 partitions, `RepartitionExec` can be
/// used to get 3 even streams of `RecordBatch`es
///
///
///```text
/// ▲ ▲ ▲
/// │ │ │
/// │ │ │
/// │ │ │
///┌───────────────┐ ┌───────────────┐ ┌───────────────┐
///│ GroupBy │ │ GroupBy │ │ GroupBy │
///│ (Partial) │ │ (Partial) │ │ (Partial) │
///└───────────────┘ └───────────────┘ └───────────────┘
/// ▲ ▲ ▲
/// └──────────────────┼──────────────────┘
/// │
/// ┌─────────────────────────┐
/// │ RepartitionExec │
/// │ (hash/round robin) │
/// └─────────────────────────┘
/// ▲ ▲
/// ┌───────────┘ └───────────┐
/// │ │
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input :
/// : Partition 0 ; : Partition 1 ;
/// ╲ ╱ ╲ ╱
/// '─. ,─' '─. ,─'
/// `───────' `───────'
///```
///
/// # Output Ordering
///
/// No guarantees are made about the order of the resulting
/// partitions unless `preserve_order` is set.
///
/// # Footnote
///
/// The "Exchange Operator" was first described in the 1989 paper
/// [Encapsulation of parallelism in the Volcano query processing
/// system
/// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf)
/// which uses the term "Exchange" for the concept of repartitioning
/// data across threads.
#[derive(Debug)]
pub struct RepartitionExec {
/// Input execution plan
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub enum LogicalPlan {
Join(Join),
/// Apply Cross Join to two logical plans
CrossJoin(CrossJoin),
/// Repartition the plan based on a partitioning scheme.
/// Repartition the plan based on a partitioning scheme
Repartition(Repartition),
/// Union multiple inputs
Union(Union),
Expand Down