Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewite LocalExecutor to be simpler, and to shutdown cleanly on Python 3.10+ #23944

Merged
merged 5 commits into from
Nov 20, 2024

Conversation

ashb
Copy link
Member

@ashb ashb commented May 26, 2022

Something changed between Python 3.7 and 3.10 meaning that a limited
parallelism LocalExecutor scheduler now doesn't shutdown cleanly on
receiving a signal.

On closer inspection of the limited vs unlimited path it apepars to me that
the code was "over-generalized" and the entire concept of self.impl has been
removed hopefully making this code much more direct and easier to understand.

The key things are now:

  • When a task needs to be run, we send the message on a mp.SimpleQueue object,
    and increment an internal counter.

    (We use our own counter, not qsize method as that is not portable)

  • Inside _check_workers we see if we think there are any outstanding messages,
    and create a worker if there are.

    The reason we do this is the on macOS (where the default mp start method is
    "spawn") a process will be started via exeucte_async, but it will take a
    second or two to pull the message of the queue, by which time the scheduler
    will have called executor.sync() again, meaning we'd over create workers
    (but never above the limit).

    Avoiding that case is why we keep the internal _outstanding_messages
    counter -- self.activity_queue.empty() would return False when the worker
    is booting up.

  • Everytime the scheduler calls the sync() method we read out of the result
    queue and decrement the internal counter.

  • We remove the entire use of multiprocessing.Manager -- it doesn't seem to
    do anything other than create queue objects but for our use it just adds
    complexity to understanding

  • Almost as a side-effect we now only create worker subprocesses on demand,
    instead of pre-launching them.

    We do not currently shut down idle processes, though adding it would be
    quite straight forward if we wanted to in the future

This branch name was "rewrite-local-exexc-concurrentfutures" (sic) as when
originally opened in 2022 for 3.10 that was the plan.

However since then 3.12 has come out and it now starts issuing warnings when
Fork and threads are used, and concurrent.futures uses a thread internally, so
a different approach was used.

@ashb ashb requested review from kaxil and XD-DENG as code owners May 26, 2022 16:38
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label May 26, 2022
@ashb ashb requested review from potiuk and eladkal May 26, 2022 21:58
@ashb
Copy link
Member Author

ashb commented May 26, 2022

This could either be in a 2.3.3 or 2.4.0 -- it's not critical and I haven't tested it all that thoroughly to make sure it doesn't have edge cases

@uranusjr
Copy link
Member

Tests are not happy 😢

@ashb
Copy link
Member Author

ashb commented May 27, 2022

Looking. I only checked the local executor tests before pushing

@ashb
Copy link
Member Author

ashb commented May 27, 2022

Error: Process completed with exit code 137.

Damn, thats OOM.

I wonder why this is using so much more memory.

@ashb
Copy link
Member Author

ashb commented May 27, 2022

Okay, another re-think needed.

This seems to work fine on 3.10, but on 3.7 it ends up spawning thousands of processes

@ashb
Copy link
Member Author

ashb commented May 27, 2022

Oh right, prior to Python 3.9 ProcessPoolExecutor precreates the specified number of worker processes.

So essentially this was a fork-bomb on Py 3.7 and 3.8.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jul 25, 2022
@github-actions github-actions bot closed this Jul 31, 2022
@ashb ashb modified the milestones: Airflow 2.4.0, Airflow 2.4.1 Sep 14, 2022
@jedcunningham jedcunningham reopened this Sep 23, 2022
@jedcunningham jedcunningham removed this from the Airflow 2.4.1 milestone Sep 23, 2022
@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 24, 2022
@ashb ashb added the pinned Protect from Stalebot auto closing label Nov 7, 2022
@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 21, 2023
@eladkal eladkal removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 21, 2023
@ashb
Copy link
Member Author

ashb commented Nov 20, 2024

This branch name was "rewrite-local-exexc-concurrentfutures" (sic) as when originally opened in 2022 for 3.10 that was the plan.

However since then 3.12 has come out and it now starts issuing warnings when Fork and threads are used, and concurrent.futures uses a thread internally, so a different approach was used.

@ashb ashb force-pushed the rewrite-local-exexc-concurrentfutures branch from 1ccc14a to c7f2a3b Compare November 20, 2024 12:44
@ashb ashb requested a review from hussein-awala as a code owner November 20, 2024 12:44
@ashb ashb changed the title Re-write LocalExecutor workers to cleanly shutdown on 3.10 Rewite LocalExecutor to be simpler, and to shutdown cleanly on Python 3.10+ Nov 20, 2024
… 3.10+

Something changed between Python 3.7 and 3.10 meaning that a limited
parallelism LocalExecutor scheduler now doesn't shutdown cleanly on
receiving a signal.

On closer inspection of the limited vs unlimited path it apepars to me that
the code was "over-generalized" and the entire concept of `self.impl` has been
removed hopefully making this code much more direct and easier to understand.

The key things are now:

- When a task needs to be run, we send the message on a mp.SimpleQueue object,
  and increment an internal counter.

  (We use our own counter, not qsize method as that is not portable)

- Inside _check_workers we see if we think there are any outstanding messages,
  and create a worker if there are.

  The reason we do this is the on macOS (where the default mp start method is
  "spawn") a process will be started via `exeucte_async`, but it will take a
  second or two to pull the message of the queue, by which time the scheduler
  will have called `executor.sync()` again, meaning we'd over create workers
  (but never above the limit).

  Avoiding that case is why we keep the internal `_outstanding_messages`
  counter -- `self.activity_queue.empty()` would return False when the worker
  is booting up.

- Everytime the scheduler calls the `sync()` method we read out of the result
  queue and decrement the internal counter.

- We remove the entire use of `multiprocessing.Manager` -- it doesn't seem to
  do anything other than create queue objects but for our use it just adds
  complexity to understanding

- Almost as a side-effect we now only create worker subprocesses on demand,
  instead of pre-launching them.

  We do not currently shut down idle processes, though adding it would be
  quite straight forward if we wanted to in the future

This branch name was "rewrite-local-exexc-concurrentfutures" (sic) as when
originally opened in 2022 for 3.10 that was the plan.

However since then 3.12 has come out and it now starts issuing warnings when
Fork and threads are used, and concurrent.futures uses a thread internally, so
a different approach was used.
@ashb ashb force-pushed the rewrite-local-exexc-concurrentfutures branch from c7f2a3b to 392abf9 Compare November 20, 2024 13:11
@ashb
Copy link
Member Author

ashb commented Nov 20, 2024

Tests currently failing cos I moved setup out of start() into init(), but ExecutorLoader caches instances. I'll move it back.

airflow/executors/local_executor.py Outdated Show resolved Hide resolved
airflow/executors/local_executor.py Outdated Show resolved Hide resolved
airflow/executors/local_executor.py Outdated Show resolved Hide resolved
airflow/executors/local_executor.py Show resolved Hide resolved
airflow/executors/local_executor.py Show resolved Hide resolved
tests/executors/test_local_executor.py Outdated Show resolved Hide resolved
@ashb ashb merged commit 47cdb84 into apache:main Nov 20, 2024
45 checks passed
@ashb ashb deleted the rewrite-local-exexc-concurrentfutures branch November 20, 2024 17:04
LefterisXefteris pushed a commit to LefterisXefteris/airflow that referenced this pull request Jan 5, 2025
… 3.10+ (apache#23944)

Something changed between Python 3.7 and 3.10 meaning that a limited
parallelism LocalExecutor scheduler now doesn't shutdown cleanly on
receiving a signal.

On closer inspection of the limited vs unlimited path it apepars to me that
the code was "over-generalized" and the entire concept of `self.impl` has been
removed hopefully making this code much more direct and easier to understand.

The key things are now:

- When a task needs to be run, we send the message on a mp.SimpleQueue object,
  and increment an internal counter.

  (We use our own counter, not qsize method as that is not portable)

- Inside _check_workers we see if we think there are any outstanding messages,
  and create a worker if there are.

  The reason we do this is the on macOS (where the default mp start method is
  "spawn") a process will be started via `exeucte_async`, but it will take a
  second or two to pull the message of the queue, by which time the scheduler
  will have called `executor.sync()` again, meaning we'd over create workers
  (but never above the limit).

  Avoiding that case is why we keep the internal `_unread_messages`
  counter -- `self.activity_queue.empty()` would return False when the worker
  is booting up.

- We remove the entire use of `multiprocessing.Manager` -- it doesn't seem to
  do anything other than create queue objects but for our use it just adds
  complexity to understanding

- Almost as a side-effect we now only create worker subprocesses on demand,
  instead of pre-launching them.

  We do not currently shut down idle processes, though adding it would be
  quite straight forward if we wanted to in the future

This branch name was "rewrite-local-exexc-concurrentfutures" (sic) as when
originally opened in 2022 for 3.10 that was the plan.

However since then 3.12 has come out and it now starts issuing warnings when
Fork and threads are used, and concurrent.futures uses a thread internally, so
a different approach was used.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler pinned Protect from Stalebot auto closing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants