Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(7181): cascading loser tree merges #7379

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5eaaeec
refactor(7181): move the management of input data, and output data, t…
wiedld Aug 15, 2023
a3870d0
feat(7181): add cursor.seek()
wiedld Aug 15, 2023
e54e92c
refactor(7181): move streaming_merge() into separate mod from the mer…
wiedld Aug 15, 2023
3d43e97
feat(7181): streaming_merge() consumes SortPreservingCascadeStream
wiedld Aug 15, 2023
1a6a364
feat(7181): change BatchBuilder to be a SortOrder Builder, with the S…
wiedld Aug 15, 2023
28454c5
feat(7181): add slice() to Cursor trait
wiedld Aug 17, 2023
b766712
feat(7181): make SortOrderBuilder yield in a stream.
wiedld Aug 17, 2023
9ff37f3
feat(7181): enable the ability to create a multi-layer merge cascade.
wiedld Aug 22, 2023
eb647ea
feat(7181): build multiple-level cascade tree.
wiedld Aug 22, 2023
8cd22a0
feat(7181): use RecordBatch tracking to avoid expensive slicing of ba…
wiedld Aug 28, 2023
173577b
fix(7181): improve performance by using hasher on tuple (unqiue slice…
wiedld Aug 29, 2023
b0f1402
chore(7181): make a zero-cost BatchId type, for more explicit code
wiedld Aug 31, 2023
9ea3a65
refactor: comment the major streaming structures, and how they are in…
wiedld Aug 31, 2023
7be30c2
refactor: use u64 as batch_id in cascading merge sort
wiedld Sep 1, 2023
0e9573d
feat(7181): convert into generic ReceiverStream, such that can be reu…
wiedld Sep 2, 2023
c439138
feat(7181): add buffered multithreading to merge streams
wiedld Sep 2, 2023
fca522b
test(7181): have sort preserving merge tests, run in both single thre…
wiedld Sep 3, 2023
50c8636
chore: TMP COMMIT pointing at arrow-rs branch, for CI pipeline
wiedld Sep 3, 2023
cfa32fa
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 13, 2023
d520496
chore: clippy and linter
wiedld Sep 13, 2023
a324ef8
fix(7181): have RowCursor slicing be within the a single arc-refed Rows
wiedld Sep 15, 2023
d3613bd
feat(7181): have BatchCursor be the primary struct passed around
wiedld Sep 15, 2023
3786021
feat(7181): update documentation for the cascaded merge
wiedld Sep 15, 2023
2932bd5
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 15, 2023
8701220
fix: add apache license header to new mods
wiedld Sep 15, 2023
828a5d1
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 19, 2023
0dfc60c
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Sep 19, 2023
e642420
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Oct 5, 2023
f97cc4d
feat(7181): remove mutex around polled stream.
wiedld Sep 19, 2023
9b10198
Merge branch 'main' into 7181/cascading-loser-tree-merges
wiedld Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl ExecutionPlan for ParquetExec {
let stream =
FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?;

Ok(Box::pin(stream))
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

fn metrics(&self) -> Option<MetricsSet> {
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tokio = { version = "1.28", features = ["sync", "fs", "parking_lot"] }
uuid = { version = "^1.2", features = ["v4"] }

[dev-dependencies]
paste = "1.0.14"
rstest = "0.18.0"
termtree = "0.4.1"
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
10 changes: 6 additions & 4 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use datafusion_common::{internal_err, DataFusionError, Result};
use futures::StreamExt;

use super::expressions::PhysicalSortExpr;
use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use super::stream::{
ReceiverStream, RecordBatchReceiverStreamAdaptor, RecordBatchStreamAdapter,
};
use super::{DisplayAs, Distribution, SendableRecordBatchStream};
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -155,11 +157,11 @@ impl ExecutionPlan for AnalyzeExec {
// parallel (on a separate tokio task) using a JoinSet to
// cancel outstanding futures on drop
let num_input_partitions = self.input.output_partitioning().partition_count();
let mut builder =
RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
let mut builder = ReceiverStream::builder(self.schema(), num_input_partitions);
let input = Arc::new(RecordBatchReceiverStreamAdaptor::new(self.input.clone()));

for input_partition in 0..num_input_partitions {
builder.run_input(self.input.clone(), input_partition, context.clone());
builder.run_input(input.clone(), input_partition, context.clone());
}

// Create future that computes thefinal output
Expand Down
8 changes: 5 additions & 3 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::stream::{ObservedStream, RecordBatchReceiverStream};
use super::stream::{ObservedStream, ReceiverStream, RecordBatchReceiverStreamAdaptor};
use super::{DisplayAs, SendableRecordBatchStream, Statistics};

use crate::{DisplayFormatType, EquivalenceProperties, ExecutionPlan, Partitioning};
Expand Down Expand Up @@ -145,12 +145,14 @@ impl ExecutionPlan for CoalescePartitionsExec {
// least one result in an attempt to maximize
// parallelism.
let mut builder =
RecordBatchReceiverStream::builder(self.schema(), input_partitions);
ReceiverStream::builder(self.schema(), input_partitions);
let input =
Arc::new(RecordBatchReceiverStreamAdaptor::new(self.input.clone()));
Copy link
Contributor Author

@wiedld wiedld Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecordBatchReceiverStream was made generic in this commit, such that it could handle a buffered stream of record_batches, or the sort_orders (yielded per each merge node).

In order to make generic, did the following:

  • create a StreamAdapter trait, with the StreamAdapter::call() to be used for ReceiverStream::run_input().
  • impl a RecordBatchReceiverStreamAdaptor that is used for record batches

Please let me know if I should have structured this differently.


// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
for part_i in 0..input_partitions {
builder.run_input(self.input.clone(), part_i, context.clone());
builder.run_input(input.clone(), part_i, context.clone());
}

let stream = builder.build();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Defines common code used in execution plans

use super::SendableRecordBatchStream;
use crate::stream::RecordBatchReceiverStream;
use crate::stream::ReceiverStream;
use crate::{ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
Expand Down Expand Up @@ -102,7 +102,7 @@ pub(crate) fn spawn_buffered(
Ok(handle)
if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread =>
{
let mut builder = RecordBatchReceiverStream::builder(input.schema(), buffer);
let mut builder = ReceiverStream::builder(input.schema(), buffer);

let sender = builder.tx();

Expand Down
89 changes: 89 additions & 0 deletions datafusion/physical-plan/src/sorts/batch_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_common::Result;

use super::cursor::Cursor;

pub type BatchId = u64;

#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct BatchOffset(pub usize);

pub type SlicedBatchCursorIdentifier = (BatchId, BatchOffset);

/// The [`BatchCursor`] represents a complete, or partial, [`Cursor`] for a given record batch ([`BatchId`]).
///
/// A record batch (represented by its [`Cursor`]) can be sliced due to the following reason:
/// 1. a merge node takes in 10 streams
/// 2 at any given time, this means up to 10 cursors (record batches) are being merged (e.g. in the loser tree)
/// 3. merge nodes will yield once it hits a size limit
/// 4. at the moment of yielding, there may be some cursors which are partially yielded
///
/// Unique representation of sliced cursor is denoted by the [`SlicedBatchCursorIdentifier`].
#[derive(Debug)]
pub struct BatchCursor<C: Cursor> {
Copy link
Contributor Author

@wiedld wiedld Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the CursorStream (not the BatchCursorStream which included the actual record batches). Therefore, I think this could have a better name.

It wraps the cursor, and maps it to the original (tracked) batch -- as well as tracking the sliced offset. Naming ideas?

/// The index into BatchTrackingStream::batches
batch: BatchId,
/// The offset of the row within the given batch, based on the idea of a sliced cursor.
/// When a batch is partially yielded, then the offset->end will determine how much was yielded.
row_offset: BatchOffset,

/// The cursor for the given batch.
pub cursor: C,
}

impl<C: Cursor> BatchCursor<C> {
/// Create a new [`BatchCursor`] from a [`Cursor`] and a [`BatchId`].
///
/// New [`BatchCursor`]s will have a [`BatchOffset`] of 0.
/// Subsequent batch_cursors can be created by slicing.
pub fn new(batch: BatchId, cursor: C) -> Self {
Self {
batch,
row_offset: BatchOffset(0),
cursor,
}
}

/// A unique identifier used to identify a [`BatchCursor`]
pub fn identifier(&self) -> SlicedBatchCursorIdentifier {
(self.batch, self.row_offset)
}

/// Slicing of a batch cursor is done by slicing the underlying cursor,
/// and adjust the BatchOffset
pub fn slice(&self, offset: usize, length: usize) -> Result<Self> {
Ok(Self {
batch: self.batch,
row_offset: BatchOffset(self.row_offset.0 + offset),
cursor: self.cursor.slice(offset, length)?,
})
}
}

impl<C: Cursor> std::fmt::Display for BatchCursor<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"BatchCursor(batch: {}, offset: {}, num_rows: {})",
self.batch,
self.row_offset.0,
self.cursor.num_rows()
)
}
}
Loading