Skip to content

Conversation

@gabotechs
Copy link
Contributor

@gabotechs gabotechs commented May 19, 2025

Which issue does this PR close?

Rationale for this change

Make RepartitionExec propagate .execute() calls immediately to it's child input. More details in the issue.

What changes are included in this PR?

Refactor RepartitionExecState so that it can be in three different states:

  • NotInitialized <- the node was just instantiated but not executed yet
  • InputStreamsInitialized <- the node was executed and the input streams are initialized, but they have not yet been polled
  • ConsumingInputStreams <- the node is in the process of consuming the streams, polling messages out of them

Are these changes tested?

By existing adapted tests

Are there any user-facing changes?

People that have implemented any custom PhysicalPlan implementation placed anywhere behind a RepartitionExec should now expect their custom_physical_plan.execute() to be executed immediately without the need of polling any message in the global arrow stream.

@gabotechs gabotechs changed the title Propagate .execute() calls immediately instead of lazily on the first… Propagate .execute() calls immediately in RepartitionExec May 19, 2025
Comment on lines -1499 to +1550
assert_eq!(batches_without_drop, batches_with_drop);
fn sort(batch: Vec<RecordBatch>) -> Vec<RecordBatch> {
batch
.into_iter()
.sorted_by_key(|b| format!("{b:?}"))
.collect()
}

assert_eq!(sort(batches_without_drop), sort(batches_with_drop));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found this test to be flaky after my changes. Just ordering the record batches removes the flakiness. Not sure why this started being flaky now though...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe now the streams are actually subject to tokio's schedule where before it was the first that was executed 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 they are yeah, not 100% sure why they weren't before though

@gabotechs gabotechs force-pushed the repartition-exec-execute-propagation branch from 3f91bc3 to c803dd2 Compare May 19, 2025 10:18
///
/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration
/// (e.g. removing channels on completion) where the overhead of `await` is not warranted.
type LazyState = Arc<tokio::sync::OnceCell<Mutex<RepartitionExecState>>>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @crepererum as you are the original author of this approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to remove the LazyState approach? The scenario described in the comments still seems applicable (even during execute() I think)

FWIW I belive @crepererum is out this week so we would have to wait for next week for his input

Copy link
Contributor Author

@gabotechs gabotechs May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually one of the main points of the PR is removing this LazyState. The issue is that LazyState::get_or_init() is an async method, and therefore, it needs to be called within an async context.

As PhysicalPlan::execute is not async, we are forced to initialized the LazyState inside the future::stream::once(async move { ... }) block, which means that the LazyState::get_or_init() will not be called until the first message in the stream is polled, therefore delaying the .execute() call to the child input.

I see that the purpose of introducing LazyState in #10009 was to reduce lock contention in RepartitionExec::execute calls, but my guess is that this can be more simply solved by just checking an AtomicBool in order to just lock the state once, letting any other threads continue the work without performing locks on RepartitionExec::execute, and therefore, allowing us to call input.execute() synchronously upon a RepartitionExec::execute call.

Not sure if there's a middle term solution to this that allows us to keep the LazyState, I'll try to think of something, but otherwise I'm happy to wait for @crepererum's input next week

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @gabotechs -- most of this PR makes perfect sense to me. The only thing I see that worries me is the removal of LazyState (tokio::sync::OnceCell).

I suggest we restore the use of LazyState in this PR and we can merge it, and we can make a separate follow on PR to remove LazyState and wait for @crepererum to be review

///
/// Uses a parking_lot `Mutex` to control other accesses as they are very short duration
/// (e.g. removing channels on completion) where the overhead of `await` is not warranted.
type LazyState = Arc<tokio::sync::OnceCell<Mutex<RepartitionExecState>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to remove the LazyState approach? The scenario described in the comments still seems applicable (even during execute() I think)

FWIW I belive @crepererum is out this week so we would have to wait for next week for his input

Comment on lines -1499 to +1550
assert_eq!(batches_without_drop, batches_with_drop);
fn sort(batch: Vec<RecordBatch>) -> Vec<RecordBatch> {
batch
.into_iter()
.sorted_by_key(|b| format!("{b:?}"))
.collect()
}

assert_eq!(sort(batches_without_drop), sort(batches_with_drop));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe now the streams are actually subject to tokio's schedule where before it was the first that was executed 🤔

@gabotechs gabotechs force-pushed the repartition-exec-execute-propagation branch from c803dd2 to 9c7a78e Compare May 19, 2025 16:17
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me -- thank you @gabotechs

I think it would also be good for @crepererum to review if he has time next week

Comment on lines 655 to 656
if !self.state_initialized.swap(true, Ordering::Relaxed) {
state.lock().ensure_input_streams_initialized(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The observation done by @gabotechs here #16093 (comment) is correct:

  1. The original intention was to make execute not content.
  2. The delay is indeed a breakage of the API contract.

However I want to broaden the discussion a bit:

This is racy: the first thread could check the boolean but not get the lock, while other threads skip the IF body and start to poll, at which point they get the lock (around line 670 WITH PR applied, around the comment "lock mutexes") and now you're blocking the async IO runtime with initialization work, or with your implementation you just get an error RepartitionExecState::init_input_streams must be called before consuming input streams (this needs to be fixed before merging because this might happen under high load!).

And even if you would find a non-racy API that combines the boolean w/ the lock, you still have the same semantic race. I think the question is: are we allowed to poll streams even if not all execute calls finished? I would say: yes. Like in theory you could even interleave:

  1. execute partition 0
  2. poll partition 0
  3. execute partition 1
  4. poll partition 1
  5. ...

I think in general, this problem cannot be fully fixed though: you either block during execute or you potentially block during poll, at least as long the execute method needs to be called PER PARTITION -- which TBH might have been a somewhat unfortunate choice, I would rather call it once and return a vector of streams.

So the question is: is this PR better then the status quo? I would say yes, but I would like to see at least one additional test to simulate the race described above so it doesn't error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 The only way that I imagine this to be racy is:

  1. Thread 1 and 2 call .execute() at the same time
  2. Thread 1 wins the atomic bool, but still has not acquired the lock (it's about to, it's just on the next line)
  3. while Thread 1 is between lines 655 and 656 Thread 2:
    • skips the if statement
    • builds the stream
    • polls the stream
    • acquires the lock
    • fails
  4. Thread 1 finally moves from line 655 to 656 and tries to acquire the lock, but it's too late now

Seems highly unlikely (we've run this under high load for a while and it was never happened), and I'm not sure if we can replicate this reliably in a unit test, but is theoretically possible.

A couple of options come to mind:

  • Remove the AtomicBool, but then we are back again on locking every thread
  • Use a tokio::sync::oneshot channel that allows awaiting for something to be there during polling

which TBH might have been a somewhat unfortunate choice, I would rather call it once and return a vector of streams.

👍 This would greatly simplify the approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems highly unlikely (we've run this under high load for a while and it was never happened)

I don't fancy to hunt Heisenbugs in production. Esp. some cloud deployments w/ low/partial CPU allocations might be subject to this issue. You can easily stall a thread of a few milliseconds.

I'm not sure if we can replicate this reliably in a unit test

Fair, but the fact that the code is strongly typed and allows that to happen is reason to change it. I'm OK w/o a test then. The expected behavior would then be that you might need to block during stream polling (because thread 1 may still have the lock and runs the preparation step or the said poller needs to do that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a small change in 1b276cc. Now:

  1. Thread 1 and 2 call .execute() at the same time
  2. Thread 1 wins the atomic bool, but still has not acquired the lock (it's about to, it's just on the next line)
  3. while Thread 1 is between lines 655 and 656 Thread 2:
    • skips the if statement
    • builds the stream
    • polls the stream
    • acquires the lock
  4. Thread 1 finally moves from line 655 to 656 and tries to acquire the lock, but it's locked by Thread 2, so it waits until it gets released
  5. Thread 2 does all the initialization work inside the stream polling, releasing the lock
  6. Thread 1 acquires the lock, but there's nothing to do, as all the initialization already happened, so it moves on

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed another small change for further simplifying the setup 21db4d5

Removes the state_initialized: Arc<AtomicBool> in favor of optimistically locking the state: If the state is locked, it just moves on without blocking, otherwise, the lock is acquired

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT @crepererum? I'm also very open to simpler approaches

…the RepartitionExecState if it was not initialized
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label May 26, 2025
Copy link
Contributor

@crepererum crepererum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thank you.

@alamb
Copy link
Contributor

alamb commented May 28, 2025

Looks all good to me, so let's go! 🚀

@alamb alamb merged commit 56a2af7 into apache:main May 28, 2025
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RepartitionExec not immediately propagating .execute() calls to children

3 participants