Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ labelPRBasedOnFilePath:

area:Scheduler:
- airflow/jobs/scheduler_job_runner.py
- airflow/task/standard_task_runner.py
- docs/apache-airflow/administration-and-deployment/scheduler.rst
- tests/jobs/test_scheduler_job.py

Expand Down
72 changes: 2 additions & 70 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,51 +544,8 @@ def string_lower_type(val):
ARG_KERBEROS_ONE_TIME_MODE = Arg(
("-o", "--one-time"), help="Run airflow kerberos one time instead of forever", action="store_true"
)
# run
ARG_INTERACTIVE = Arg(
("-N", "--interactive"),
help="Do not capture standard output and error streams (useful for interactive debugging)",
action="store_true",
)
# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
# all dependencies (not just past success), e.g. the ignore_depends_on_past
# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
# the "ignore_all_dependencies" command should be called the"force" command
# instead.
ARG_FORCE = Arg(
("-f", "--force"),
help="Ignore previous task instance state, rerun regardless if task already succeeded/failed",
action="store_true",
)
ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
ARG_IGNORE_ALL_DEPENDENCIES = Arg(
("-A", "--ignore-all-dependencies"),
help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
action="store_true",
)
# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
# vague (e.g. a task being in the appropriate state to be run is also a dependency
# but is not ignored by this flag), the name 'ignore_task_dependencies' is
# slightly better (as it ignores all dependencies that are specific to the task),
# so deprecate the old command name and use this instead.
ARG_IGNORE_DEPENDENCIES = Arg(
("-i", "--ignore-dependencies"),
help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies",
action="store_true",
)
ARG_DEPENDS_ON_PAST = Arg(
("-d", "--depends-on-past"),
help="Determine how Airflow should deal with past dependencies. The default action is `check`, Airflow "
"will check if the past dependencies are met for the tasks having `depends_on_past=True` before run "
"them, if `ignore` is provided, the past dependencies will be ignored, if `wait` is provided and "
"`depends_on_past=True`, Airflow will wait the past dependencies until they are met before running or "
"skipping the task",
choices={"check", "ignore", "wait"},
default="check",
)
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
# tasks
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true")


# database
Expand Down Expand Up @@ -838,7 +795,7 @@ def string_lower_type(val):
# jobs check
ARG_JOB_TYPE_FILTER = Arg(
("--job-type",),
choices=("LocalTaskJob", "SchedulerJob", "TriggererJob", "DagProcessorJob"),
choices=("SchedulerJob", "TriggererJob", "DagProcessorJob"),
action="store",
help="The type of job(s) that will be checked.",
)
Expand Down Expand Up @@ -1253,31 +1210,6 @@ class GroupCommand(NamedTuple):
ARG_MAP_INDEX,
),
),
ActionCommand(
name="run",
help="Run a single task instance",
func=lazy_load_command("airflow.cli.commands.remote_commands.task_command.task_run"),
args=(
ARG_DAG_ID,
ARG_TASK_ID,
ARG_LOGICAL_DATE_OR_RUN_ID,
ARG_SUBDIR,
ARG_MARK_SUCCESS,
ARG_FORCE,
ARG_POOL,
ARG_CFG_PATH,
ARG_LOCAL,
ARG_RAW,
ARG_IGNORE_ALL_DEPENDENCIES,
ARG_IGNORE_DEPENDENCIES,
ARG_DEPENDS_ON_PAST,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
ARG_VERBOSE,
ARG_READ_FROM_DB,
),
),
ActionCommand(
name="test",
help="Test a task instance",
Expand Down
248 changes: 3 additions & 245 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,28 @@
import json
import logging
import os
import sys
import textwrap
from collections.abc import Generator
from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress
from pathlib import Path
from contextlib import redirect_stdout
from typing import TYPE_CHECKING, Protocol, cast

from airflow import settings
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred, TaskInstanceNotFound
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job, run_job
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.listeners.listener import get_listener_manager
from airflow.exceptions import DagRunNotFound, TaskDeferred, TaskInstanceNotFound
from airflow.models import TaskInstance
from airflow.models.dag import DAG, _run_inline_trigger
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskReturnCode
from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.execution_time.secrets_masker import RedactedIO
from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.utils import cli as cli_utils, timezone
from airflow.utils.cli import (
get_dag,
get_dag_by_file_location,
get_dags,
should_ignore_depends_on_past,
suppress_logs_and_warning,
)
from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
from airflow.utils.log.logging_mixin import StreamLogWriter
from airflow.utils.net import get_hostname
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -123,8 +109,7 @@ def _get_dag_run(
return dag_run, False
elif not create_if_necessary:
raise DagRunNotFound(
f"DagRun for {dag.dag_id} with run_id or logical_date "
f"of {logical_date_or_run_id!r} not found"
f"DagRun for {dag.dag_id} with run_id or logical_date of {logical_date_or_run_id!r} not found"
)

dag_run_logical_date = timezone.coerce_datetime(logical_date)
Expand Down Expand Up @@ -227,237 +212,10 @@ def _get_ti(
return ti, dr_created


def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode:
"""
Run the task based on a mode.

Any of the 3 modes are available:

- using LocalTaskJob
- as raw task
- by executor
"""
if args.local:
return _run_task_by_local_task_job(args, ti)
if args.raw:
return _run_raw_task(args, ti)
_run_task_by_executor(args, dag, ti)
return None


def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
"""
Send the task to the executor for execution.

This can result in the task being started by another host if the executor implementation does.
"""
from airflow.executors.base_executor import BaseExecutor

if ti.executor:
executor = ExecutorLoader.load_executor(ti.executor)
else:
executor = ExecutorLoader.get_default_executor()
executor.job_id = None
executor.start()
print("Sending to executor.")

# TODO: Task-SDK: this is temporary while we migrate the other executors over
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
from airflow.executors import workloads

if TYPE_CHECKING:
assert dag.relative_fileloc
workload = workloads.ExecuteTask.make(ti, dag_rel_path=Path(dag.relative_fileloc))
with create_session() as session:
executor.queue_workload(workload, session)
else:
executor.queue_task_instance(
ti,
mark_success=args.mark_success,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=should_ignore_depends_on_past(args),
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool,
)
executor.heartbeat()
executor.end()


def _run_task_by_local_task_job(args, ti: TaskInstance) -> TaskReturnCode | None:
"""Run LocalTaskJob, which monitors the raw task execution process."""
job_runner = LocalTaskJobRunner(
job=Job(dag_id=ti.dag_id),
task_instance=ti,
mark_success=args.mark_success,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=should_ignore_depends_on_past(args),
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool,
external_executor_id=_extract_external_executor_id(args),
)
try:
ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
finally:
if args.shut_down_logging:
logging.shutdown()
with suppress(ValueError):
return TaskReturnCode(ret)
return None


RAW_TASK_UNSUPPORTED_OPTION = [
"ignore_all_dependencies",
"ignore_dependencies",
"force",
]


def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
"""Run the main task handling code."""
return ti._run_raw_task(
mark_success=args.mark_success,
pool=args.pool,
)


def _extract_external_executor_id(args) -> str | None:
if hasattr(args, "external_executor_id"):
return getattr(args, "external_executor_id")
return os.environ.get("external_executor_id", None)


@contextmanager
def _move_task_handlers_to_root(ti: TaskInstance) -> Generator[None, None, None]:
"""
Move handlers for task logging to root logger.

We want anything logged during task run to be propagated to task log handlers.
If running in a k8s executor pod, also keep the stream handler on root logger
so that logs are still emitted to stdout.
"""
# nothing to do
if not ti.log.handlers or settings.DONOT_MODIFY_HANDLERS:
yield
return

# Move task handlers to root and reset task logger and restore original logger settings after exit.
# If k8s executor, we need to ensure that root logger has a console handler, so that
# task logs propagate to stdout (this is how webserver retrieves them while task is running).
root_logger = logging.getLogger()
console_handler = next((h for h in root_logger.handlers if h.name == "console"), None)
with LoggerMutationHelper(root_logger), LoggerMutationHelper(ti.log) as task_helper:
task_helper.move(root_logger)
if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
if console_handler and console_handler not in root_logger.handlers:
root_logger.addHandler(console_handler)
yield


@contextmanager
def _redirect_stdout_to_ti_log(ti: TaskInstance) -> Generator[None, None, None]:
"""
Redirect stdout to ti logger.

Redirect stdout and stderr to the task instance log as INFO and WARNING
level messages, respectively.

If stdout already redirected (possible when task running with option
`--local`), don't redirect again.
"""
# if sys.stdout is StreamLogWriter, it means we already redirected
# likely before forking in LocalTaskJob
if not isinstance(sys.stdout, StreamLogWriter):
info_writer = StreamLogWriter(ti.log, logging.INFO)
warning_writer = StreamLogWriter(ti.log, logging.WARNING)
with redirect_stdout(info_writer), redirect_stderr(warning_writer):
yield
else:
yield


class TaskCommandMarker:
"""Marker for listener hooks, to properly detect from which component they are called."""


@cli_utils.action_cli(check_db=False)
def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
"""
Run a single task instance.

Note that there must be at least one DagRun for this to start,
i.e. it must have been scheduled and/or triggered previously.
Alternatively, if you just need to run it for testing then use
"airflow tasks test ..." command instead.
"""
# Load custom airflow config

if args.local and args.raw:
raise AirflowException(
"Option --raw and --local are mutually exclusive. "
"Please remove one option to execute the command."
)

if args.raw:
unsupported_options = [o for o in RAW_TASK_UNSUPPORTED_OPTION if getattr(args, o)]

if unsupported_options:
unsupported_raw_task_flags = ", ".join(f"--{o}" for o in RAW_TASK_UNSUPPORTED_OPTION)
unsupported_flags = ", ".join(f"--{o}" for o in unsupported_options)
raise AirflowException(
"Option --raw does not work with some of the other options on this command. "
"You can't use --raw option and the following options: "
f"{unsupported_raw_task_flags}. "
f"You provided the option {unsupported_flags}. "
"Delete it to execute the command."
)

if args.cfg_path:
with open(args.cfg_path) as conf_file:
conf_dict = json.load(conf_file)

if os.path.exists(args.cfg_path):
os.remove(args.cfg_path)

conf.read_dict(conf_dict, source=args.cfg_path)
settings.configure_vars()

settings.MASK_SECRETS_IN_LOGS = True

get_listener_manager().hook.on_starting(component=TaskCommandMarker())

if not dag:
_dag = get_dag(args.subdir, args.dag_id, args.read_from_db)
else:
_dag = dag
task = _dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(task, args.map_index, logical_date_or_run_id=args.logical_date_or_run_id, pool=args.pool)
ti.init_run_context(raw=args.raw)

hostname = get_hostname()

log.info("Running %s on host %s", ti, hostname)

task_return_code = None
try:
if args.interactive:
task_return_code = _run_task_by_selected_method(args, _dag, ti)
else:
with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
task_return_code = _run_task_by_selected_method(args, _dag, ti)
if task_return_code == TaskReturnCode.DEFERRED:
_set_task_deferred_context_var()
finally:
try:
get_listener_manager().hook.before_stopping(component=TaskCommandMarker())
except Exception:
pass
return task_return_code


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def task_failed_deps(args) -> None:
Expand Down
Loading