Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/cli/commands/local_commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)
Expand Down
119 changes: 11 additions & 108 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class DagFileStat:
class DagParsingSignal(enum.Enum):
"""All signals sent to parser."""

AGENT_RUN_ONCE = "agent_run_once"
TERMINATE_MANAGER = "terminate_manager"
END_MANAGER = "end_manager"

Expand All @@ -118,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param async_mode: Whether to start agent in async mode
"""

def __init__(
Expand All @@ -127,14 +125,12 @@ def __init__(
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
async_mode: bool,
):
super().__init__()
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
self._async_mode = async_mode
# Map from file path to the processor
self._processors: dict[str, DagFileProcessorProcess] = {}
# Pipe for communicating signals
Expand All @@ -161,7 +157,6 @@ def start(self) -> None:
self._processor_timeout,
child_signal_conn,
self._dag_ids,
self._async_mode,
),
)
self._process = process
Expand All @@ -170,57 +165,19 @@ def start(self) -> None:

self.log.info("Launched DagFileProcessorManager with pid: %s", process.pid)

def run_single_parsing_loop(self) -> None:
"""
Send agent heartbeat signal to the manager, requesting that it runs one processing "loop".

Should only be used when launched DAG file processor manager in sync mode.

Call wait_until_finished to ensure that any launched processors have finished before continuing.
"""
if not self._parent_signal_conn or not self._process:
raise ValueError("Process not started.")
if not self._process.is_alive():
return

try:
self._parent_signal_conn.send(DagParsingSignal.AGENT_RUN_ONCE)
except ConnectionError:
# If this died cos of an error then we will noticed and restarted
# when harvest_serialized_dags calls _heartbeat_manager.
pass

def get_callbacks_pipe(self) -> MultiprocessingConnection:
"""Return the pipe for sending Callbacks to DagProcessorManager."""
if not self._parent_signal_conn:
raise ValueError("Process not started.")
return self._parent_signal_conn

def wait_until_finished(self) -> None:
"""Wait until DAG parsing is finished."""
if not self._parent_signal_conn:
raise ValueError("Process not started.")
if self._async_mode:
raise RuntimeError("wait_until_finished should only be called in sync_mode")
while self._parent_signal_conn.poll(timeout=None):
try:
result = self._parent_signal_conn.recv()
except EOFError:
return
self._process_message(result)
if isinstance(result, DagParsingStat):
# In sync mode (which is the only time we call this function) we don't send this message from
# the Manager until all the running processors have finished
return

@staticmethod
def _run_processor_manager(
dag_directory: os.PathLike,
max_runs: int,
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: list[str] | None,
async_mode: bool,
) -> None:
# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
Expand All @@ -241,7 +198,6 @@ def _run_processor_manager(
processor_timeout=processor_timeout,
dag_ids=dag_ids,
signal_conn=signal_conn,
async_mode=async_mode,
)
processor_manager.start()

Expand Down Expand Up @@ -352,7 +308,6 @@ class DagFileProcessorManager(LoggingMixin):
:param processor_timeout: How long to wait before timing out a DAG file processor
:param signal_conn: connection to communicate signal with processor agent.
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param async_mode: whether to start the manager in async mode
"""

def __init__(
Expand All @@ -362,7 +317,6 @@ def __init__(
processor_timeout: timedelta,
dag_ids: list[str] | None,
signal_conn: MultiprocessingConnection | None = None,
async_mode: bool = True,
):
super().__init__()
# known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly
Expand All @@ -372,30 +326,16 @@ def __init__(
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
self._dag_ids = dag_ids
self._async_mode = async_mode
self._parsing_start_time: float | None = None
self._dag_directory = dag_directory
# Set the signal conn in to non-blocking mode, so that attempting to
# send when the buffer is full errors, rather than hangs for-ever
# attempting to send (this is to avoid deadlocks!)
#
# Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
# continue the scheduler
if self._async_mode and self._direct_scheduler_conn is not None:
if self._direct_scheduler_conn:
os.set_blocking(self._direct_scheduler_conn.fileno(), False)

self.standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
self._parallelism = conf.getint("scheduler", "parsing_processes")
if (
conf.get_mandatory_value("database", "sql_alchemy_conn").startswith("sqlite")
and self._parallelism > 1
):
self.log.warning(
"Because we cannot use more than 1 thread (parsing_processes = "
"%d) when using sqlite. So we set parallelism to 1.",
self._parallelism,
)
self._parallelism = 1

# Parse and schedule each file no faster than this interval.
self._file_process_interval = conf.getint("scheduler", "min_file_process_interval")
Expand Down Expand Up @@ -531,20 +471,13 @@ def deactivate_stale_dags(
cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated)

def _run_parsing_loop(self):
# In sync mode we want timeout=None -- wait forever until a message is received
if self._async_mode:
poll_time = 0.0
else:
poll_time = None
poll_time = 0.0

self._refresh_dag_dir()
self.prepare_file_path_queue()
max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop")

if self._async_mode:
# If we're in async mode, we can start up straight away. If we're
# in sync mode we need to be told to start a "loop"
self.start_new_processes()
self.start_new_processes()
while True:
with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span:
loop_start_time = time.monotonic()
Expand All @@ -557,36 +490,16 @@ def _run_parsing_loop(self):

self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
if span.is_recording():
span.add_event(name="terminate")
self.terminate()
break
elif agent_signal == DagParsingSignal.END_MANAGER:
if span.is_recording():
span.add_event(name="end")
self.end()
sys.exit(os.EX_OK)
elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
# continue the loop to parse dags
pass
elif isinstance(agent_signal, CallbackRequest):
self._add_callback_to_queue(agent_signal)
else:
raise ValueError(f"Invalid message {type(agent_signal)}")

if not ready and not self._async_mode:
# In "sync" mode we don't want to parse the DAGs until we
# are told to (as that would open another connection to the
# SQLite DB which isn't a good practice

# This shouldn't happen, as in sync mode poll should block for
# ever. Lets be defensive about that.
self.log.warning(
"wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time
)

continue

for sentinel in ready:
if sentinel is not self._direct_scheduler_conn:
processor = self.waitables.get(sentinel)
Expand Down Expand Up @@ -631,14 +544,6 @@ def _run_parsing_loop(self):
# Update number of loop iteration.
self._num_run += 1

if not self._async_mode:
self.log.debug("Waiting for processors to finish since we're using sqlite")
# Wait until the running DAG processors are finished before
# sending a DagParsingStat message back. This means the Agent
# can tell we've got to the end of this iteration when it sees
# this type of message
self.wait_until_finished()

# Collect anything else that has finished, but don't kick off any more processors
if span.is_recording():
span.add_event(name="collect_results")
Expand All @@ -664,10 +569,9 @@ def _run_parsing_loop(self):
except BlockingIOError:
# Try again next time around the loop!

# It is better to fail, than it is deadlock. This should
# "almost never happen" since the DagParsingStat object is
# small, and in async mode this stat is not actually _required_
# for normal operation (It only drives "max runs")
# It is better to fail, than it is deadlock. This should "almost never happen" since the
# DagParsingStat object is small, and is not actually _required_ for normal operation (It
# only drives "max runs")
self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring")

if max_runs_reached:
Expand All @@ -683,12 +587,11 @@ def _run_parsing_loop(self):
)
break

if self._async_mode:
loop_duration = time.monotonic() - loop_start_time
if loop_duration < 1:
poll_time = 1 - loop_duration
else:
poll_time = 0.0
loop_duration = time.monotonic() - loop_start_time
if loop_duration < 1:
poll_time = 1 - loop_duration
else:
poll_time = 0.0

@classmethod
@provide_session
Expand Down
1 change: 0 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ class BaseExecutor(LoggingMixin):
supports_sentry: bool = False

is_local: bool = False
is_single_threaded: bool = False
is_production: bool = True

change_sensor_mode_to_reschedule: bool = False
Expand Down
1 change: 0 additions & 1 deletion airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class DebugExecutor(BaseExecutor):

_terminated = threading.Event()

is_single_threaded: bool = True
is_production: bool = False

change_sensor_mode_to_reschedule: bool = True
Expand Down
49 changes: 4 additions & 45 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

from __future__ import annotations

import functools
import logging
import os
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowConfigException, UnknownExecutorException
Expand Down Expand Up @@ -236,68 +234,29 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor
return executor

@classmethod
def import_executor_cls(
cls, executor_name: ExecutorName, validate: bool = True
) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_executor_cls(cls, executor_name: ExecutorName) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Import the executor class.

Supports the same formats as ExecutorLoader.load_executor.

:param executor_name: Name of core executor or module path to executor.
:param validate: Whether or not to validate the executor before returning

:return: executor class via executor_name and executor import source
"""

def _import_and_validate(path: str) -> type[BaseExecutor]:
executor = import_string(path)
if validate:
cls.validate_database_executor_compatibility(executor)
return executor

return _import_and_validate(executor_name.module_path), executor_name.connector_source
return import_string(executor_name.module_path), executor_name.connector_source

@classmethod
def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Import the default executor class.

:param validate: Whether or not to validate the executor before returning

:return: executor class and executor import source
"""
executor_name = cls.get_default_executor_name()
executor, source = cls.import_executor_cls(executor_name, validate=validate)
executor, source = cls.import_executor_cls(executor_name)
return executor, source

@classmethod
@functools.cache
def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None:
"""
Validate database and executor compatibility.

Most of the databases work universally, but SQLite can only work with
single-threaded executors (e.g. Sequential).

This is NOT done in ``airflow.configuration`` (when configuration is
initialized) because loading the executor class is heavy work we want to
avoid unless needed.
"""
# Single threaded executors can run with any backend.
if executor.is_single_threaded:
return

# This is set in tests when we want to be able to use SQLite.
if os.environ.get("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK") == "1":
return

from airflow.settings import engine

# SQLite only works with single threaded executors
if engine.dialect.name == "sqlite":
raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}")

@classmethod
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
Expand Down
1 change: 0 additions & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class SequentialExecutor(BaseExecutor):
"""

is_local: bool = True
is_single_threaded: bool = True
is_production: bool = False

serve_logs: bool = True
Expand Down
Loading