Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC Adaptive round robin repartitioning #13699

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,8 @@ fn add_roundrobin_on_top(
n_target: usize,
) -> Result<DistributionContext> {
// Adding repartition is helpful:
if input.plan.output_partitioning().partition_count() < n_target {
// TODO: should we only do for certain sources?
if input.plan.output_partitioning().partition_count() < n_target || input.plan.children().is_empty() {
// When there is an existing ordering, we preserve ordering
// during repartition. This will be un-done in the future
// If any of the following conditions is true
Expand Down
69 changes: 57 additions & 12 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,11 @@ impl RepartitionExec {
metrics: RepartitionMetrics,
context: Arc<TaskContext>,
) -> Result<()> {
let is_round_robin = matches!(partitioning, Partitioning::RoundRobinBatch(_));
let num_partitions = partitioning.partition_count();

let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;

// execute the child operator
let timer = metrics.fetch_time.timer();
let mut stream = input.execute(partition, context)?;
Expand All @@ -801,22 +803,65 @@ impl RepartitionExec {
None => break,
};

for res in partitioner.partition_iter(batch)? {
'next_batch: for res in partitioner.partition_iter(batch)? {
let (partition, batch) = res?;
let size = batch.get_array_memory_size();

let timer = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
if is_round_robin {
// try out the default (round robin) partition, then all other partitions once starting with partition + 1
for p in std::iter::once(partition).chain(partition+1..num_partitions).chain(0..partition) {
let timer = metrics.send_time[partition].timer();

if let Some((tx, reservation)) = output_channels.get_mut(&p) {
reservation.lock().try_grow(size)?;

let sent = tx.send(Some(Ok(batch.clone()))).now_or_never();

match sent {
Some(res) if res.is_err() => {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
}
Some(_) => {
timer.done();
continue 'next_batch;
}
None => {
timer.done();
}
}
}
}
// not yet succeeded to send to any partition, now send to the partition and wait for it
let timer: metrics::ScopedTimerGuard<'_> =
metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
}
}
timer.done();
} else {
let timer: metrics::ScopedTimerGuard<'_> =
metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
}
}
timer.done();
}
timer.done();
}

// If the input stream is endless, we may spin forever and
Expand Down
Loading