Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

from setproctitle import setproctitle

from airflow import settings
from airflow.executors import workloads
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
from airflow.utils.session import NEW_SESSION, provide_session
Expand All @@ -61,10 +60,6 @@ def _run_worker(
log = logging.getLogger(logger_name)
log.info("Worker starting up pid=%d", os.getpid())

# We know we've just started a new process, so lets disconnect from the metadata db now
settings.engine.pool.dispose()
settings.engine.dispose()

while True:
setproctitle("airflow worker -- LocalExecutor: <idle>")
try:
Expand Down
14 changes: 8 additions & 6 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,14 +1061,16 @@ def _run_scheduler_loop(self) -> None:
with create_session() as session:
# This will schedule for as many executors as possible.
num_queued_tis = self._do_scheduling(session)
# Don't keep any objects alive -- we've possibly just looked at 500+ ORM objects!
session.expunge_all()

# Heartbeat all executors, even if they're not receiving new tasks this loop. It will be
# either a no-op, or they will check-in on currently running tasks and send out new
# events to be processed below.
for executor in self.job.executors:
executor.heartbeat()
# Heartbeat all executors, even if they're not receiving new tasks this loop. It will be
# either a no-op, or they will check-in on currently running tasks and send out new
# events to be processed below.
for executor in self.job.executors:
executor.heartbeat()

session.expunge_all()
with create_session() as session:
num_finished_events = 0
for executor in self.job.executors:
num_finished_events += self._process_executor_events(
Expand Down
4 changes: 1 addition & 3 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ def block_orm_access():
from airflow import settings
from airflow.configuration import conf

settings.dispose_orm()

for attr in ("engine", "async_engine", "Session", "AsyncSession", "NonScopedSession"):
if hasattr(settings, attr):
delattr(settings, attr)
Expand Down Expand Up @@ -329,7 +327,7 @@ def exit(n: int) -> NoReturn:
import traceback

try:
last_chance_stderr.write("--- Last chance exception handler ---\n")
last_chance_stderr.write("--- Supervised process Last chance exception handler ---\n")
traceback.print_exception(exc, value=v, tb=tb, file=last_chance_stderr)
# Exit code 126 and 125 don't have any "special" meaning, they are only meant to serve as an
# identifier that the task process died in a really odd way.
Expand Down
Loading