Skip to content

Commit

Permalink
Completed D400 for multiple folders (#27767)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdsoha authored Nov 27, 2022
1 parent 2242ea4 commit 70a9980
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 165 deletions.
5 changes: 3 additions & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
# specific language governing permissions and limitations
# under the License.
"""
Init setup.
Authentication is implemented using flask_login and different environments can
implement their own login mechanisms by providing an `airflow_login` module
in their PYTHONPATH. airflow_login should be based off the
`airflow.www.login`
in their PYTHONPATH. airflow_login should be based off the `airflow.www.login`
isort:skip_file
"""
Expand Down
4 changes: 2 additions & 2 deletions airflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Main executable module"""
"""Main executable module."""
from __future__ import annotations

import os
Expand All @@ -29,7 +29,7 @@


def main():
"""Main executable function"""
"""Main executable function."""
if conf.get("core", "security") == "kerberos":
os.environ["KRB5CCNAME"] = conf.get("kerberos", "ccache")
os.environ["KRB5_KTNAME"] = conf.get("kerberos", "keytab")
Expand Down
74 changes: 33 additions & 41 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@


class DagParsingStat(NamedTuple):
"""Information on processing progress"""
"""Information on processing progress."""

done: bool
all_files_processed: bool


class DagFileStat(NamedTuple):
"""Information about single processing of one file"""
"""Information about single processing of one file."""

num_dags: int
import_errors: int
Expand All @@ -92,10 +92,11 @@ class DagParsingSignal(enum.Enum):

class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
"""
Agent for DAG file processing. It is responsible for all DAG parsing
related jobs in scheduler process. Mainly it can spin up DagFileProcessorManager
in a subprocess, collect DAG parsing results from it and communicate
signal/DAG parsing stat with it.
Agent for DAG file processing.
It is responsible for all DAG parsing related jobs in scheduler process.
Mainly it can spin up DagFileProcessorManager in a subprocess,
collect DAG parsing results from it and communicate signal/DAG parsing stat with it.
This class runs in the main `airflow scheduler` process.
Expand Down Expand Up @@ -257,7 +258,7 @@ def _run_processor_manager(
processor_manager.start()

def heartbeat(self) -> None:
"""Check if the DagFileProcessorManager process is alive, and process any pending messages"""
"""Check if the DagFileProcessorManager process is alive, and process any pending messages."""
if not self._parent_signal_conn:
raise ValueError("Process not started.")
# Receive any pending messages before checking if the process has exited.
Expand Down Expand Up @@ -314,19 +315,16 @@ def _sync_metadata(self, stat):

@property
def done(self) -> bool:
"""Has DagFileProcessorManager ended?"""
"""Whether the DagFileProcessorManager finished."""
return self._done

@property
def all_files_processed(self):
"""Have all files been processed at least once?"""
"""Whether all files been processed at least once."""
return self._all_files_processed

def terminate(self):
"""
Send termination signal to DAG parsing processor manager
and expect it to terminate all DAG file processors.
"""
"""Send termination signal to DAG parsing processor manager to terminate all DAG file processors."""
if self._process and self._process.is_alive():
self.log.info("Sending termination message to manager.")
try:
Expand All @@ -335,10 +333,7 @@ def terminate(self):
pass

def end(self):
"""
Terminate (and then kill) the manager process launched.
:return:
"""
"""Terminate (and then kill) the manager process launched."""
if not self._process:
self.log.warning("Ending without manager process.")
return
Expand All @@ -352,6 +347,8 @@ def end(self):

class DagFileProcessorManager(LoggingMixin):
"""
Manage processes responsible for parsing DAGs.
Given a list of DAG definition files, this kicks off several processors
in parallel to process them and put the results to a multiprocessing.Queue
for DagFileProcessorAgent to harvest. The parallelism is limited and as the
Expand Down Expand Up @@ -454,7 +451,7 @@ def __init__(
)

def register_exit_signals(self):
"""Register signals that stop child processes"""
"""Register signals that stop child processes."""
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
# So that we ignore the debug dump signal, making it easier to send
Expand All @@ -471,10 +468,9 @@ def _exit_gracefully(self, signum, frame):

def start(self):
"""
Use multiple processes to parse and generate tasks for the
DAGs in parallel. By processing them in separate processes,
we can get parallelism and isolation from potentially harmful
user code.
Use multiple processes to parse and generate tasks for the DAGs in parallel.
By processing them in separate processes, we can get parallelism and isolation
from potentially harmful user code.
"""
self.register_exit_signals()

Expand All @@ -491,7 +487,7 @@ def start(self):
@provide_session
def _deactivate_stale_dags(self, session=None):
"""
Detects DAGs which are no longer present in files
Detects DAGs which are no longer present in files.
Deactivate them and remove them in the serialized_dag table
"""
Expand Down Expand Up @@ -536,7 +532,6 @@ def _deactivate_stale_dags(self, session=None):
self.last_deactivate_stale_dags_time = timezone.utcnow()

def _run_parsing_loop(self):

# In sync mode we want timeout=None -- wait forever until a message is received
if self._async_mode:
poll_time = 0.0
Expand Down Expand Up @@ -687,7 +682,6 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
guard.commit()

def _add_callback_to_queue(self, request: CallbackRequest):

# requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
# task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
# goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
Expand Down Expand Up @@ -766,7 +760,7 @@ def _refresh_dag_dir(self):
return False

def _print_stat(self):
"""Occasionally print out stats about how fast the files are getting processed"""
"""Occasionally print out stats about how fast the files are getting processed."""
if 0 < self.print_stats_interval < time.monotonic() - self.last_stat_print_time:
if self._file_paths:
self._log_file_processing_stats(self._file_paths)
Expand Down Expand Up @@ -852,9 +846,8 @@ def _log_file_processing_stats(self, known_file_paths):

def get_pid(self, file_path) -> int | None:
"""
Retrieve the PID of the process processing the given file or None if the file is not being processed.
:param file_path: the path to the file that's being processed
:return: the PID of the process processing the given file or None if
the specified file is not being processed
"""
if file_path in self._processors:
return self._processors[file_path].pid
Expand All @@ -870,6 +863,7 @@ def get_all_pids(self) -> list[int]:

def get_last_runtime(self, file_path) -> float | None:
"""
Retrieve the last processing time of a specific path.
:param file_path: the path to the file that was processed
:return: the runtime (in seconds) of the process of the last run, or
None if the file was never processed.
Expand All @@ -879,6 +873,7 @@ def get_last_runtime(self, file_path) -> float | None:

def get_last_dag_count(self, file_path) -> int | None:
"""
Retrieve the total DAG count at a specific path.
:param file_path: the path to the file that was processed
:return: the number of dags loaded from that file, or None if the file
was never processed.
Expand All @@ -888,6 +883,7 @@ def get_last_dag_count(self, file_path) -> int | None:

def get_last_error_count(self, file_path) -> int | None:
"""
Retrieve the total number of errors from processing a specific path.
:param file_path: the path to the file that was processed
:return: the number of import errors from processing, or None if the file
was never processed.
Expand All @@ -897,6 +893,7 @@ def get_last_error_count(self, file_path) -> int | None:

def get_last_finish_time(self, file_path) -> datetime | None:
"""
Retrieve the last completion time for processing a specific path.
:param file_path: the path to the file that was processed
:return: the finish time of the process of the last run, or None if the
file was never processed.
Expand All @@ -906,6 +903,7 @@ def get_last_finish_time(self, file_path) -> datetime | None:

def get_start_time(self, file_path) -> datetime | None:
"""
Retrieve the last start time for processing a specific path.
:param file_path: the path to the file that's being processed
:return: the start time of the process that's processing the
specified file or None if the file is not currently being processed
Expand All @@ -916,8 +914,8 @@ def get_start_time(self, file_path) -> datetime | None:

def get_run_count(self, file_path) -> int:
"""
The number of times the given file has been parsed.
:param file_path: the path to the file that's being processed
:return: the number of times the given file has been parsed
"""
stat = self._file_stats.get(file_path)
return stat.run_count if stat else 0
Expand Down Expand Up @@ -984,7 +982,7 @@ def _collect_results_from_processor(self, processor) -> None:
Stats.timing(f"dag_processing.last_duration.{file_name}", last_duration)

def collect_results(self) -> None:
"""Collect the result from any finished DAG processors"""
"""Collect the result from any finished DAG processors."""
ready = multiprocessing.connection.wait(
self.waitables.keys() - [self._direct_scheduler_conn], timeout=0
)
Expand Down Expand Up @@ -1013,7 +1011,7 @@ def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_req
)

def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
"""Start more processors if we have enough slots and files to process."""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.popleft()
# Stop creating duplicate processor i.e. processor with the same filepath
Expand Down Expand Up @@ -1157,7 +1155,7 @@ def _kill_timed_out_processors(self):
self._processors.pop(proc)

def max_runs_reached(self):
""":return: whether all file paths have been processed max_runs times"""
""":return: whether all file paths have been processed max_runs times."""
if self._max_runs == -1: # Unlimited runs.
return False
for stat in self._file_stats.values():
Expand All @@ -1168,26 +1166,20 @@ def max_runs_reached(self):
return True

def terminate(self):
"""
Stops all running processors
:return: None
"""
"""Stops all running processors."""
for processor in self._processors.values():
Stats.decr("dag_processing.processes")
processor.terminate()

def end(self):
"""
Kill all child processes on exit since we don't want to leave
them as orphaned.
"""
"""Kill all child processes on exit since we don't want to leave them as orphaned."""
pids_to_kill = self.get_all_pids()
if pids_to_kill:
kill_child_processes_by_pids(pids_to_kill)

def emit_metrics(self):
"""
Emit metrics about dag parsing summary
Emit metrics about dag parsing summary.
This is called once every time around the parsing "loop" - i.e. after
all files have been parsed.
Expand Down
21 changes: 11 additions & 10 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@


class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
"""Runs DAG processing in a separate process using DagFileProcessor
"""
Runs DAG processing in a separate process using DagFileProcessor.
:param file_path: a Python file containing Airflow DAG definitions
:param pickle_dags: whether to serialize the DAG objects to the DB
Expand Down Expand Up @@ -266,7 +267,7 @@ def pid(self) -> int:
@property
def exit_code(self) -> int | None:
"""
After the process is finished, this can be called to get the return code
After the process is finished, this can be called to get the return code.
:return: the exit code of the process
"""
Expand Down Expand Up @@ -367,8 +368,9 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L
@provide_session
def manage_slas(self, dag: DAG, session: Session = None) -> None:
"""
Finding all tasks that have SLAs defined, and sending alert emails
where needed. New SLA misses are also recorded in the database.
Finding all tasks that have SLAs defined, and sending alert emails when needed.
New SLA misses are also recorded in the database.
We are assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
Expand Down Expand Up @@ -524,6 +526,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
@staticmethod
def update_import_errors(session: Session, dagbag: DagBag) -> None:
"""
Update any import errors to be displayed in the UI.
For the DAGs in the given DagBag, record any associated import errors and clears
errors for files that no longer have them. These are usually displayed through the
Airflow UI so that users know that there are issues parsing DAGs.
Expand Down Expand Up @@ -564,10 +567,7 @@ def update_import_errors(session: Session, dagbag: DagBag) -> None:

@provide_session
def _validate_task_pools(self, *, dagbag: DagBag, session: Session = NEW_SESSION):
"""
Validates and raise exception if any task in a dag is using a non-existent pool
:meta private:
"""
"""Validates and raise exception if any task in a dag is using a non-existent pool."""
from airflow.models.pool import Pool

def check_pools(dag):
Expand All @@ -590,6 +590,7 @@ def check_pools(dag):

def update_dag_warnings(self, *, session: Session, dagbag: DagBag) -> None:
"""
Update any import warnings to be displayed in the UI.
For the DAGs in the given DagBag, record any associated configuration warnings and clear
warnings for files that no longer have them. These are usually displayed through the
Airflow UI so that users know that there are issues parsing DAGs.
Expand All @@ -616,8 +617,8 @@ def execute_callbacks(
self, dagbag: DagBag, callback_requests: list[CallbackRequest], session: Session = NEW_SESSION
) -> None:
"""
Execute on failure callbacks. These objects can come from SchedulerJob or from
DagFileProcessorManager.
Execute on failure callbacks.
These objects can come from SchedulerJob or from DagFileProcessorManager.
:param dagbag: Dag Bag of dags
:param callback_requests: failure callbacks to execute
Expand Down
4 changes: 2 additions & 2 deletions airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


def configure_logging():
"""Configure & Validate Airflow Logging"""
"""Configure & Validate Airflow Logging."""
logging_class_path = ""
try:
logging_class_path = conf.get("logging", "logging_config_class")
Expand Down Expand Up @@ -79,7 +79,7 @@ def configure_logging():


def validate_logging_config(logging_config):
"""Validate the provided Logging Config"""
"""Validate the provided Logging Config."""
# Now lets validate the other logging-related settings
task_log_reader = conf.get("logging", "task_log_reader")

Expand Down
Loading

0 comments on commit 70a9980

Please sign in to comment.