diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index c3efa3c9d5d2b..87a8e71d3589d 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -35,7 +35,6 @@ from setproctitle import setproctitle -from airflow import settings from airflow.executors import workloads from airflow.executors.base_executor import PARALLELISM, BaseExecutor from airflow.utils.session import NEW_SESSION, provide_session @@ -61,10 +60,6 @@ def _run_worker( log = logging.getLogger(logger_name) log.info("Worker starting up pid=%d", os.getpid()) - # We know we've just started a new process, so lets disconnect from the metadata db now - settings.engine.pool.dispose() - settings.engine.dispose() - while True: setproctitle("airflow worker -- LocalExecutor: ") try: diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 88fc6abb60952..a8b5ccced848a 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1061,14 +1061,16 @@ def _run_scheduler_loop(self) -> None: with create_session() as session: # This will schedule for as many executors as possible. num_queued_tis = self._do_scheduling(session) + # Don't keep any objects alive -- we've possibly just looked at 500+ ORM objects! + session.expunge_all() - # Heartbeat all executors, even if they're not receiving new tasks this loop. It will be - # either a no-op, or they will check-in on currently running tasks and send out new - # events to be processed below. - for executor in self.job.executors: - executor.heartbeat() + # Heartbeat all executors, even if they're not receiving new tasks this loop. It will be + # either a no-op, or they will check-in on currently running tasks and send out new + # events to be processed below. + for executor in self.job.executors: + executor.heartbeat() - session.expunge_all() + with create_session() as session: num_finished_events = 0 for executor in self.job.executors: num_finished_events += self._process_executor_events( diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 39ad05aebf968..36986a3a5f270 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -234,8 +234,6 @@ def block_orm_access(): from airflow import settings from airflow.configuration import conf - settings.dispose_orm() - for attr in ("engine", "async_engine", "Session", "AsyncSession", "NonScopedSession"): if hasattr(settings, attr): delattr(settings, attr) @@ -329,7 +327,7 @@ def exit(n: int) -> NoReturn: import traceback try: - last_chance_stderr.write("--- Last chance exception handler ---\n") + last_chance_stderr.write("--- Supervised process Last chance exception handler ---\n") traceback.print_exception(exc, value=v, tb=tb, file=last_chance_stderr) # Exit code 126 and 125 don't have any "special" meaning, they are only meant to serve as an # identifier that the task process died in a really odd way.