diff --git a/.github/ISSUE_TEMPLATE/airflow_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_bug_report.yml index d1ee0a25b0603..f991b45d14e51 100644 --- a/.github/ISSUE_TEMPLATE/airflow_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/airflow_bug_report.yml @@ -26,7 +26,7 @@ body: multiple: false options: - "2.10.5" - - "3.0.0a1" + - "3.0.0" - "main (development)" - "Other Airflow 2 version (please specify below)" validations: diff --git a/INTHEWILD.md b/INTHEWILD.md index ef49c61f10bbb..1c4ed574416ec 100644 --- a/INTHEWILD.md +++ b/INTHEWILD.md @@ -93,6 +93,7 @@ Currently, **officially** using Airflow: 1. [Bloomberg](https://www.techatbloomberg.com) [[@skandala23] (https://github.com/skandala23) & [@vfeldsher](https://https://github.com/vfeldsher)] 1. [Bloomreach](https://www.bloomreach.com/) [[@neelborooah](https://github.com/neelborooah) & [@debodirno](https://github.com/debodirno) & [@ayushmnnit](https://github.com/ayushmnnit)] 1. [Blue Yonder](http://www.blue-yonder.com) [[@blue-yonder](https://github.com/blue-yonder)] +1. [Blue3 Investimentos](https://blue3investimentos.com.br) [[@ericcoleta] (https://github.com/ericcoleta) & [@plutaniano](https://github.com/plutaniano)] 1. [BlueApron](https://www.blueapron.com) [[@jasonjho](https://github.com/jasonjho) & [@matthewdavidhauser](https://github.com/matthewdavidhauser)] 1. [Bluecore](https://www.bluecore.com) [[@JLDLaughlin](https://github.com/JLDLaughlin)] 1. [Bluekiri](https://bluekiri.com) [[@Bluekiri](https://github.com/bluekiri)] @@ -154,7 +155,7 @@ Currently, **officially** using Airflow: 1. [Crealytics](https://crealytics.com) 1. [Credit Karma](https://www.creditkarma.com/) [[@preete-dixit-ck](https://github.com/preete-dixit-ck) & [@harish-gaggar-ck](https://github.com/harish-gaggar-ck) & [@greg-finley-ck](https://github.com/greg-finley-ck)] 1. [Creditas](https://www.creditas.com.br) [[@dcassiano](https://github.com/dcassiano)] -1. [CreditCards.com](https://www.creditcards.com/) [[@vmAggies](https://github.com/vmAggies) & [@jay-wallaby](https://github.com/jay-wallaby)] +1. [CreditCards.com](https://www.creditcards.com/) [[@vmAggies](https://github.com/vmAggies) & [@jay-wallaby](https://github.com/jay-wallaby)] 1. [CRST - The Transportation Solution, Inc.](https://crst.com) 1. [Cryptalizer.com](https://www.cryptalizer.com/) 1. [Currency](https://www.gocurrency.com/) [[@FCLI](https://github.com/FCLI) & [@alexbegg](https://github.com/alexbegg)] @@ -297,7 +298,7 @@ Currently, **officially** using Airflow: 1. [Jeitto](https://www.jeitto.com.br) [[@BrennerPablo](https://github.com/BrennerPablo) & [@ds-mauri](https://github.com/ds-mauri)] 1. [Jetlore](http://www.jetlore.com/) [[@bderose](https://github.com/bderose)] 1. [Jobrapido](https://www.jobrapido.com/) [[@mattiagiupponi](https://github.com/mattiagiupponi)] -1. [JobTeaser](https://www.jobteaser.com) [[@stefani75](https://github.com/stefani75) & [@knil-sama](https://github.com/knil-sama)] +1. [JobTeaser](https://www.jobteaser.com) [[@stefani75](https://github.com/stefani75) & [@knil-sama](https://github.com/knil-sama)] 1. [JULO](https://www.julo.co.id/) [[@sepam](https://github.com/sepam) & [@tenapril](https://github.com/tenapril) & [@verzqy](https://github.com/verzqy)] 1. [Kalibrr](https://www.kalibrr.com/) [[@charlesverdad](https://github.com/charlesverdad)] 1. [Kargo](https://kargo.com) [[@chaithra-yenikapati](https://github.com/chaithra-yenikapati), [@akarsh3007](https://github.com/akarsh3007) & [@dineshanchan](https://github.com/dineshanchan)] diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py index 9f75f4dad52fe..38a062b1b2836 100644 --- a/airflow/api_connexion/schemas/dag_schema.py +++ b/airflow/api_connexion/schemas/dag_schema.py @@ -51,6 +51,8 @@ class Meta: dag_id = auto_field(dump_only=True) dag_display_name = fields.String(attribute="dag_display_name", dump_only=True) + bundle_name = auto_field(dump_only=True) + bundle_version = auto_field(dump_only=True) is_paused = auto_field() is_active = auto_field(dump_only=True) last_parsed_time = auto_field(dump_only=True) diff --git a/airflow/api_fastapi/execution_api/app.py b/airflow/api_fastapi/execution_api/app.py index a9bbf4dd7888c..b6b36e08c1137 100644 --- a/airflow/api_fastapi/execution_api/app.py +++ b/airflow/api_fastapi/execution_api/app.py @@ -22,12 +22,17 @@ from typing import TYPE_CHECKING import attrs -from fastapi import FastAPI +from fastapi import FastAPI, Request from fastapi.openapi.utils import get_openapi +from fastapi.responses import JSONResponse if TYPE_CHECKING: import httpx +import structlog + +logger = structlog.get_logger(logger_name=__name__) + @asynccontextmanager async def lifespan(app: FastAPI): @@ -86,6 +91,16 @@ def custom_openapi() -> dict: app.openapi = custom_openapi # type: ignore[method-assign] app.include_router(execution_api_router) + + # As we are mounted as a sub app, we don't get any logs for unhandled exceptions without this! + @app.exception_handler(Exception) + def handle_exceptions(request: Request, exc: Exception): + logger.exception("Handle died with an error", exc_info=(type(exc), exc, exc.__traceback__)) + content = {"message": "Internal server error"} + if "correlation-id" in request.headers: + content["correlation-id"] = request.headers["correlation-id"] + return JSONResponse(status_code=500, content=content) + return app diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index a10c1d74bda8d..32a9bb1a382ee 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -172,6 +172,7 @@ def string_lower_type(val): "--bundle-name", ), help=("The name of the DAG bundle to use; may be provided more than once"), + type=str, default=None, action="append", ) @@ -638,75 +639,16 @@ def string_lower_type(val): default=False, ) -# webserver -ARG_PORT = Arg( - ("-p", "--port"), - default=conf.get("webserver", "WEB_SERVER_PORT"), - type=int, - help="The port on which to run the server", -) -ARG_SSL_CERT = Arg( - ("--ssl-cert",), - default=conf.get("webserver", "WEB_SERVER_SSL_CERT"), - help="Path to the SSL certificate for the webserver", -) -ARG_SSL_KEY = Arg( - ("--ssl-key",), - default=conf.get("webserver", "WEB_SERVER_SSL_KEY"), - help="Path to the key to use with the SSL certificate", -) -ARG_WORKERS = Arg( - ("-w", "--workers"), - default=conf.get("webserver", "WORKERS"), - type=int, - help="Number of workers to run the webserver on", -) -ARG_WORKERCLASS = Arg( - ("-k", "--workerclass"), - default=conf.get("webserver", "WORKER_CLASS"), - choices=["sync", "eventlet", "gevent", "tornado"], - help="The worker class to use for Gunicorn", -) -ARG_WORKER_TIMEOUT = Arg( - ("-t", "--worker-timeout"), - default=conf.get("webserver", "WEB_SERVER_WORKER_TIMEOUT"), - type=int, - help="The timeout for waiting on webserver workers", -) -ARG_HOSTNAME = Arg( - ("-H", "--hostname"), - default=conf.get("webserver", "WEB_SERVER_HOST"), - help="Set the hostname on which to run the web server", -) -ARG_DEBUG = Arg( - ("-d", "--debug"), help="Use the server that ships with Flask in debug mode", action="store_true" -) -ARG_ACCESS_LOGFILE = Arg( - ("-A", "--access-logfile"), - default=conf.get("webserver", "ACCESS_LOGFILE"), - help="The logfile to store the webserver access log. Use '-' to print to stdout", -) -ARG_ERROR_LOGFILE = Arg( - ("-E", "--error-logfile"), - default=conf.get("webserver", "ERROR_LOGFILE"), - help="The logfile to store the webserver error log. Use '-' to print to stderr", -) -ARG_ACCESS_LOGFORMAT = Arg( - ("-L", "--access-logformat"), - default=conf.get("webserver", "ACCESS_LOGFORMAT"), - help="The access log format for gunicorn logs", -) - # api-server ARG_API_SERVER_PORT = Arg( ("-p", "--port"), - default=9091, + default=conf.get("api", "port"), type=int, help="The port on which to run the API server", ) ARG_API_SERVER_WORKERS = Arg( ("-w", "--workers"), - default=4, + default=conf.get("api", "workers"), type=int, help="Number of workers to run on the API server", ) @@ -717,22 +659,15 @@ def string_lower_type(val): help="The timeout for waiting on API server workers", ) ARG_API_SERVER_HOSTNAME = Arg( - ("-H", "--hostname"), - default="0.0.0.0", # nosec - help="Set the hostname on which to run the API server", + ("-H", "--host"), + default=conf.get("api", "host"), + help="Set the host on which to run the API server", ) ARG_API_SERVER_ACCESS_LOGFILE = Arg( ("-A", "--access-logfile"), + default=conf.get("api", "access_logfile"), help="The logfile to store the access log. Use '-' to print to stdout", ) -ARG_API_SERVER_ERROR_LOGFILE = Arg( - ("-E", "--error-logfile"), - help="The logfile to store the error log. Use '-' to print to stderr", -) -ARG_API_SERVER_ACCESS_LOGFORMAT = Arg( - ("-L", "--access-logformat"), - help="The access log format for gunicorn logs", -) ARG_API_SERVER_APPS = Arg( ("--apps",), help="Applications to run (comma-separated). Default is all. Options: core, execution, all", @@ -743,7 +678,17 @@ def string_lower_type(val): help="Enable X-Forwarded-Proto, X-Forwarded-For, X-Forwarded-Port to populate remote address info.", action="store_true", ) - +ARG_SSL_CERT = Arg( + ("--ssl-cert",), + default=conf.get("api", "ssl_cert"), + help="Path to the SSL certificate for the webserver", +) +ARG_SSL_KEY = Arg( + ("--ssl-key",), + default=conf.get("api", "ssl_key"), + help="Path to the key to use with the SSL certificate", +) +ARG_DEV = Arg(("-d", "--dev"), help="Start FastAPI in development mode", action="store_true") # scheduler ARG_NUM_RUNS = Arg( @@ -936,7 +881,7 @@ def string_lower_type(val): ("--columns",), type=string_list_type, help="List of columns to render. (default: ['dag_id', 'fileloc', 'owner', 'is_paused'])", - default=("dag_id", "fileloc", "owners", "is_paused"), + default=("dag_id", "fileloc", "owners", "is_paused", "bundle_name", "bundle_version"), ) ARG_ASSET_LIST_COLUMNS = Arg( @@ -1034,7 +979,7 @@ class GroupCommand(NamedTuple): name="list", help="List all the DAGs", func=lazy_load_command("airflow.cli.commands.remote_commands.dag_command.dag_list_dags"), - args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS), + args=(ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS, ARG_BUNDLE_NAME), ), ActionCommand( name="list-import-errors", @@ -1871,13 +1816,11 @@ class GroupCommand(NamedTuple): ARG_STDOUT, ARG_STDERR, ARG_API_SERVER_ACCESS_LOGFILE, - ARG_API_SERVER_ERROR_LOGFILE, - ARG_API_SERVER_ACCESS_LOGFORMAT, ARG_API_SERVER_APPS, ARG_LOG_FILE, ARG_SSL_CERT, ARG_SSL_KEY, - ARG_DEBUG, + ARG_DEV, ARG_API_SERVER_ALLOW_PROXY_FORWARDING, ), ), diff --git a/airflow/cli/commands/legacy_commands.py b/airflow/cli/commands/legacy_commands.py index c9fddec2d76fe..11016a427dcfe 100644 --- a/airflow/cli/commands/legacy_commands.py +++ b/airflow/cli/commands/legacy_commands.py @@ -48,6 +48,7 @@ "create_user": "users create", "delete_user": "users delete", "dags backfill": "backfill create", + "webserver": "api-server", } diff --git a/airflow/cli/commands/local_commands/api_server_command.py b/airflow/cli/commands/local_commands/api_server_command.py index 530c340840d14..4f3fb5ab07e59 100644 --- a/airflow/cli/commands/local_commands/api_server_command.py +++ b/airflow/cli/commands/local_commands/api_server_command.py @@ -48,13 +48,12 @@ def api_server(args): apps = args.apps access_logfile = args.access_logfile or "-" - access_logformat = args.access_logformat num_workers = args.workers worker_timeout = args.worker_timeout proxy_headers = args.proxy_headers - if args.debug: - print(f"Starting the API server on port {args.port} and host {args.hostname} debug.") + if args.dev: + print(f"Starting the API server on port {args.port} and host {args.host} in development mode.") log.warning("Running in dev mode, ignoring uvicorn args") run_args = [ @@ -64,13 +63,13 @@ def api_server(args): "--port", str(args.port), "--host", - str(args.hostname), + str(args.host), ] if args.proxy_headers: run_args.append("--proxy-headers") - # There is no way to pass the apps to airflow/api_fastapi/main.py in the debug mode + # There is no way to pass the apps to airflow/api_fastapi/main.py in the development mode # because fastapi dev command does not accept any additional arguments # so environment variable is being used to pass it os.environ["AIRFLOW_API_APPS"] = apps @@ -91,18 +90,17 @@ def api_server(args): Running the uvicorn with: Apps: {apps} Workers: {num_workers} - Host: {args.hostname}:{args.port} + Host: {args.host}:{args.port} Timeout: {worker_timeout} Logfiles: {access_logfile} - Access Logformat: {access_logformat} =================================================================""" ) ) ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args) - setproctitle(f"airflow api_server -- host:{args.hostname} port:{args.port}") + setproctitle(f"airflow api_server -- host:{args.host} port:{args.port}") uvicorn.run( "airflow.api_fastapi.main:app", - host=args.hostname, + host=args.host, port=args.port, workers=num_workers, timeout_keep_alive=worker_timeout, diff --git a/airflow/cli/commands/local_commands/standalone_command.py b/airflow/cli/commands/local_commands/standalone_command.py index d8fdb6da7aedf..77c5ad3e32168 100644 --- a/airflow/cli/commands/local_commands/standalone_command.py +++ b/airflow/cli/commands/local_commands/standalone_command.py @@ -203,8 +203,7 @@ def is_ready(self): For now, it's simply time-based. """ return ( - self.port_open(self.web_server_port) - and self.job_running(SchedulerJobRunner) + self.job_running(SchedulerJobRunner) and self.job_running(DagProcessorJobRunner) and self.job_running(TriggererJobRunner) ) diff --git a/airflow/cli/commands/remote_commands/config_command.py b/airflow/cli/commands/remote_commands/config_command.py index 7ab85c5e62105..09e2d0614dbad 100644 --- a/airflow/cli/commands/remote_commands/config_command.py +++ b/airflow/cli/commands/remote_commands/config_command.py @@ -292,6 +292,42 @@ def message(self) -> str: config=ConfigParameter("webserver", "force_log_out_after"), renamed_to=ConfigParameter("webserver", "session_lifetime_minutes"), ), + ConfigChange( + config=ConfigParameter("webserver", "web_server_host"), + renamed_to=ConfigParameter("api", "host"), + ), + ConfigChange( + config=ConfigParameter("webserver", "web_server_port"), + renamed_to=ConfigParameter("api", "port"), + ), + ConfigChange( + config=ConfigParameter("webserver", "workers"), + renamed_to=ConfigParameter("api", "workers"), + ), + ConfigChange( + config=ConfigParameter("webserver", "web_server_worker_timeout"), + renamed_to=ConfigParameter("api", "worker_timeout"), + ), + ConfigChange( + config=ConfigParameter("webserver", "web_server_ssl_cert"), + renamed_to=ConfigParameter("api", "ssl_cert"), + ), + ConfigChange( + config=ConfigParameter("webserver", "web_server_ssl_key"), + renamed_to=ConfigParameter("api", "ssl_key"), + ), + ConfigChange( + config=ConfigParameter("webserver", "access_logfile"), + renamed_to=ConfigParameter("api", "access_logfile"), + ), + ConfigChange( + config=ConfigParameter("webserver", "error_logfile"), + was_deprecated=False, + ), + ConfigChange( + config=ConfigParameter("webserver", "access_logformat"), + was_deprecated=False, + ), # policy ConfigChange( config=ConfigParameter("policy", "airflow_local_settings"), diff --git a/airflow/cli/commands/remote_commands/dag_command.py b/airflow/cli/commands/remote_commands/dag_command.py index b478798c24ad2..03fb97aba3978 100644 --- a/airflow/cli/commands/remote_commands/dag_command.py +++ b/airflow/cli/commands/remote_commands/dag_command.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + """Dag sub-commands.""" from __future__ import annotations @@ -28,7 +29,7 @@ from typing import TYPE_CHECKING import re2 -from sqlalchemy import select +from sqlalchemy import func, select from airflow.api.client import get_current_api_client from airflow.api_connexion.schemas.dag_schema import dag_schema @@ -38,6 +39,7 @@ from airflow.exceptions import AirflowException from airflow.jobs.job import Job from airflow.models import DagBag, DagModel, DagRun, TaskInstance +from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.sdk.definitions._internal.dag_parsing_context import _airflow_parsing_context_manager from airflow.utils import cli as cli_utils, timezone @@ -224,6 +226,8 @@ def _get_dagbag_dag_details(dag: DAG) -> dict: return { "dag_id": dag.dag_id, "dag_display_name": dag.dag_display_name, + "bundle_name": dag.get_bundle_name(), + "bundle_version": dag.get_bundle_version(), "is_paused": dag.get_is_paused(), "is_active": dag.get_is_active(), "last_parsed_time": None, @@ -322,11 +326,12 @@ def print_execution_interval(interval: DataInterval | None): @suppress_logs_and_warning @providers_configuration_loaded @provide_session -def dag_list_dags(args, session=NEW_SESSION) -> None: +def dag_list_dags(args, session: Session = NEW_SESSION) -> None: """Display dags with or without stats at the command line.""" cols = args.columns if args.columns else [] invalid_cols = [c for c in cols if c not in dag_schema.fields] valid_cols = [c for c in cols if c in dag_schema.fields] + if invalid_cols: from rich import print as rich_print @@ -335,8 +340,18 @@ def dag_list_dags(args, session=NEW_SESSION) -> None: f"List of valid columns: {list(dag_schema.fields.keys())}", file=sys.stderr, ) - dagbag = DagBag(process_subdir(args.subdir)) - if dagbag.import_errors: + + dagbag = DagBag(read_dags_from_db=True) + dagbag.collect_dags_from_db() + + # Get import errors from the DB + query = select(func.count()).select_from(ParseImportError) + if args.bundle_name: + query = query.where(ParseImportError.bundle_name.in_(args.bundle_name)) + + dagbag_import_errors = session.scalar(query) + + if dagbag_import_errors > 0: from rich import print as rich_print rich_print( @@ -353,8 +368,19 @@ def get_dag_detail(dag: DAG) -> dict: dag_detail = _get_dagbag_dag_details(dag) return {col: dag_detail[col] for col in valid_cols} + def filter_dags_by_bundle(dags: list[DAG], bundle_names: list[str] | None) -> list[DAG]: + """Filter DAGs based on the specified bundle name, if provided.""" + if not bundle_names: + return dags + + validate_dag_bundle_arg(bundle_names) + return [dag for dag in dags if dag.get_bundle_name() in bundle_names] + AirflowConsole().print_as( - data=sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id")), + data=sorted( + filter_dags_by_bundle(list(dagbag.dags.values()), args.bundle_name), + key=operator.attrgetter("dag_id"), + ), output=args.output, mapper=get_dag_detail, ) @@ -364,7 +390,7 @@ def get_dag_detail(dag: DAG) -> dict: @suppress_logs_and_warning @providers_configuration_loaded @provide_session -def dag_details(args, session=NEW_SESSION): +def dag_details(args, session: Session = NEW_SESSION): """Get DAG details given a DAG id.""" dag = DagModel.get_dagmodel(args.dag_id, session=session) if not dag: diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index e7907a6c8d961..48f142a2875dc 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1322,6 +1322,57 @@ api: type: string example: ~ default: "http://localhost:9091" + host: + description: | + The ip specified when starting the api server + version_added: ~ + type: string + example: ~ + default: "0.0.0.0" + port: + description: | + The port on which to run the api server + version_added: ~ + type: string + example: ~ + default: "9091" + workers: + description: | + Number of workers to run on the API server + version_added: ~ + type: string + example: ~ + default: "4" + worker_timeout: + description: | + Number of seconds the API server waits before timing out on a worker + version_added: ~ + type: string + example: ~ + default: "120" + access_logfile: + description: | + Log files for the api server. '-' means log to stderr. + version_added: ~ + type: string + example: ~ + default: "-" + ssl_cert: + description: | + Paths to the SSL certificate and key for the api server. When both are + provided SSL will be enabled. This does not change the api server port. + version_added: ~ + type: string + example: ~ + default: "" + ssl_key: + description: | + Paths to the SSL certificate and key for the api server. When both are + provided SSL will be enabled. This does not change the api server port. + version_added: ~ + type: string + example: ~ + default: "" auth_backends: description: | Comma separated list of auth backends to authenticate users of the API. See @@ -1534,36 +1585,6 @@ webserver: example: "America/New_York" # Default is left as UTC for now so the date's don't "suddenly" change on upgrade default: "UTC" - web_server_host: - description: | - The ip specified when starting the web server - version_added: ~ - type: string - example: ~ - default: "0.0.0.0" - web_server_port: - description: | - The port on which to run the web server - version_added: ~ - type: string - example: ~ - default: "8080" - web_server_ssl_cert: - description: | - Paths to the SSL certificate and key for the web server. When both are - provided SSL will be enabled. This does not change the web server port. - version_added: ~ - type: string - example: ~ - default: "" - web_server_ssl_key: - description: | - Paths to the SSL certificate and key for the web server. When both are - provided SSL will be enabled. This does not change the web server port. - version_added: ~ - type: string - example: ~ - default: "" session_backend: description: | The type of backend used to store web session data, can be ``database`` or ``securecookie``. For the @@ -1590,13 +1611,6 @@ webserver: type: string example: ~ default: "120" - web_server_worker_timeout: - description: | - Number of seconds the gunicorn webserver waits before timing out on a worker - version_added: ~ - type: string - example: ~ - default: "120" worker_refresh_batch_size: description: | Number of workers to refresh at a time through Gunicorn's built-in worker management. @@ -1636,13 +1650,6 @@ webserver: sensitive: true example: ~ default: "{SECRET_KEY}" - workers: - description: | - Number of workers to run the Gunicorn web server - version_added: ~ - type: string - example: ~ - default: "4" worker_class: description: | The worker class gunicorn should use. Choices include @@ -1666,30 +1673,6 @@ webserver: type: string example: ~ default: "sync" - access_logfile: - description: | - Log files for the gunicorn webserver. '-' means log to stderr. - version_added: ~ - type: string - example: ~ - default: "-" - error_logfile: - description: | - Log files for the gunicorn webserver. '-' means log to stderr. - version_added: ~ - type: string - example: ~ - default: "-" - access_logformat: - description: | - Access log format for gunicorn webserver. - default format is ``%%(h)s %%(l)s %%(u)s %%(t)s "%%(r)s" %%(s)s %%(b)s "%%(f)s" "%%(a)s"`` - See `Gunicorn Settings: 'access_log_format' Reference - `__ for more details - version_added: 2.0.0 - type: string - example: ~ - default: "" expose_config: description: | Expose the configuration file in the web server. Set to ``non-sensitive-only`` to show all values diff --git a/airflow/configuration.py b/airflow/configuration.py index 350aa5fd3ca1a..affda6fc476b8 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -326,6 +326,13 @@ def sensitive_config_values(self) -> set[tuple[str, str]]: # DeprecationWarning will be issued and the old option will be used instead deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = { ("dag_processor", "refresh_interval"): ("scheduler", "dag_dir_list_interval", "3.0"), + ("api", "host"): ("webserver", "web_server_host", "3.0"), + ("api", "port"): ("webserver", "web_server_port", "3.0"), + ("api", "workers"): ("webserver", "workers", "3.0"), + ("api", "worker_timeout"): ("webserver", "web_server_worker_timeout", "3.0"), + ("api", "ssl_cert"): ("webserver", "web_server_ssl_cert", "3.0"), + ("api", "ssl_key"): ("webserver", "web_server_ssl_key", "3.0"), + ("api", "access_logfile"): ("webserver", "access_logfile", "3.0"), } # A mapping of new section -> (old section, since_version). diff --git a/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx b/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx index 25972500d8ed4..1f08ab48dd363 100644 --- a/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx +++ b/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx @@ -105,8 +105,8 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => { multiple onChange={setSelectedOptions} options={[ - { label: "Past", value: "past" }, - { label: "Future", value: "future" }, + { disabled: taskInstance.logical_date === null, label: "Past", value: "past" }, + { disabled: taskInstance.logical_date === null, label: "Future", value: "future" }, { label: "Upstream", value: "upstream" }, { label: "Downstream", value: "downstream" }, { label: "Only Failed", value: "onlyFailed" }, diff --git a/airflow/ui/src/components/TogglePause.tsx b/airflow/ui/src/components/TogglePause.tsx index 78049f625e696..2b94340fff5a6 100644 --- a/airflow/ui/src/components/TogglePause.tsx +++ b/airflow/ui/src/components/TogglePause.tsx @@ -23,6 +23,7 @@ import { useCallback } from "react"; import { UseDagRunServiceGetDagRunsKeyFn, UseDagServiceGetDagDetailsKeyFn, + UseDagServiceGetDagKeyFn, useDagServiceGetDagsKey, useDagServicePatchDag, useDagsServiceRecentDagRunsKey, @@ -48,6 +49,7 @@ export const TogglePause = ({ dagDisplayName, dagId, isPaused, skipConfirm }: Pr const queryKeys = [ [useDagServiceGetDagsKey], [useDagsServiceRecentDagRunsKey], + UseDagServiceGetDagKeyFn({ dagId }, [{ dagId }]), UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }]), UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]), UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ dagId, dagRunId: "~" }]), diff --git a/airflow/ui/src/layouts/Details/DagBreadcrumb.tsx b/airflow/ui/src/layouts/Details/DagBreadcrumb.tsx index aa906a0b6ce34..4b656764a6510 100644 --- a/airflow/ui/src/layouts/Details/DagBreadcrumb.tsx +++ b/airflow/ui/src/layouts/Details/DagBreadcrumb.tsx @@ -19,12 +19,11 @@ import { HStack, Stat } from "@chakra-ui/react"; import type { ReactNode } from "react"; import { LiaSlashSolid } from "react-icons/lia"; -import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; +import { Link as RouterLink, useParams } from "react-router-dom"; import { useDagRunServiceGetDagRun, useDagServiceGetDagDetails, - useTaskInstanceServiceGetMappedTaskInstance, useTaskServiceGetTask, } from "openapi/queries"; import { StateBadge } from "src/components/StateBadge"; @@ -33,11 +32,7 @@ import { TogglePause } from "src/components/TogglePause"; import { Breadcrumb } from "src/components/ui"; export const DagBreadcrumb = () => { - const { dagId = "", runId, taskId } = useParams(); - - const [searchParams] = useSearchParams(); - const mapIndexParam = searchParams.get("map_index"); - const mapIndex = parseInt(mapIndexParam ?? "-1", 10); + const { dagId = "", mapIndex = "-1", runId, taskId } = useParams(); const { data: dag } = useDagServiceGetDagDetails({ dagId, @@ -56,19 +51,6 @@ export const DagBreadcrumb = () => { const { data: task } = useTaskServiceGetTask({ dagId, taskId }, undefined, { enabled: Boolean(taskId) }); - const { data: taskInstance } = useTaskInstanceServiceGetMappedTaskInstance( - { - dagId, - dagRunId: runId ?? "", - mapIndex, - taskId: taskId ?? "", - }, - undefined, - { - enabled: Boolean(runId) && Boolean(taskId), - }, - ); - const links: Array<{ label: ReactNode | string; labelExtra?: ReactNode; title?: string; value?: string }> = [ { label: "Dags", value: "/dags" }, @@ -99,7 +81,18 @@ export const DagBreadcrumb = () => { // Add task breadcrumb if (runId !== undefined && taskId !== undefined) { - links.push({ label: taskInstance?.task_display_name ?? taskId, title: "Task" }); + if (task?.is_mapped) { + links.push({ + label: `${task.task_display_name ?? taskId} [ ]`, + title: "Task", + value: `/dags/${dagId}/runs/${runId}/tasks/${taskId}/mapped`, + }); + } else { + links.push({ + label: task?.task_display_name ?? taskId, + title: "Task", + }); + } } if (runId === undefined && taskId !== undefined) { @@ -107,8 +100,8 @@ export const DagBreadcrumb = () => { links.push({ label: task?.task_display_name ?? taskId, title: "Task" }); } - if (mapIndexParam !== null) { - links.push({ label: mapIndexParam, title: "Map Index" }); + if (mapIndex !== "-1") { + links.push({ label: mapIndex, title: "Map Index" }); } return ( diff --git a/airflow/ui/src/layouts/Details/DetailsLayout.tsx b/airflow/ui/src/layouts/Details/DetailsLayout.tsx index 0b2178e8f0a3d..e2fa8cf77b512 100644 --- a/airflow/ui/src/layouts/Details/DetailsLayout.tsx +++ b/airflow/ui/src/layouts/Details/DetailsLayout.tsx @@ -92,7 +92,7 @@ export const DetailsLayout = ({ children, error, isLoading, tabs }: Props) => { {children} - + diff --git a/airflow/ui/src/layouts/Details/Graph/TaskLink.tsx b/airflow/ui/src/layouts/Details/Graph/TaskLink.tsx index dab512d822a84..e933329b02901 100644 --- a/airflow/ui/src/layouts/Details/Graph/TaskLink.tsx +++ b/airflow/ui/src/layouts/Details/Graph/TaskLink.tsx @@ -25,13 +25,13 @@ type Props = { readonly id: string; } & TaskNameProps; -export const TaskLink = ({ id, isGroup, ...rest }: Props) => { +export const TaskLink = ({ id, isGroup, isMapped, ...rest }: Props) => { const { dagId = "", runId, taskId } = useParams(); const [searchParams] = useSearchParams(); // We don't have a task group details page to link to if (isGroup) { - return ; + return ; } return ( @@ -39,11 +39,11 @@ export const TaskLink = ({ id, isGroup, ...rest }: Props) => { - + ); diff --git a/airflow/ui/src/layouts/Details/Grid/GridTI.tsx b/airflow/ui/src/layouts/Details/Grid/GridTI.tsx index fc939eb8f2121..85a8342d8ac42 100644 --- a/airflow/ui/src/layouts/Details/Grid/GridTI.tsx +++ b/airflow/ui/src/layouts/Details/Grid/GridTI.tsx @@ -27,6 +27,7 @@ import { StateIcon } from "src/components/StateIcon"; type Props = { readonly dagId: string; readonly isGroup?: boolean; + readonly isMapped?: boolean | null; readonly label: string; readonly runId: string; readonly search: string; @@ -50,9 +51,9 @@ const onMouseLeave = (event: MouseEvent) => { }); }; -const Instance = ({ dagId, runId, search, state, taskId }: Props) => ( +const Instance = ({ dagId, isGroup, isMapped, runId, search, state, taskId }: Props) => ( ( transition="background-color 0.2s" zIndex={1} > - + {isGroup ? ( ( /> )} - + ) : ( + + + {state === undefined ? undefined : ( + + )} + + + )} ); diff --git a/airflow/ui/src/layouts/Details/Grid/TaskInstances.tsx b/airflow/ui/src/layouts/Details/Grid/TaskInstances.tsx index 4e0929e71823e..7c763999bacff 100644 --- a/airflow/ui/src/layouts/Details/Grid/TaskInstances.tsx +++ b/airflow/ui/src/layouts/Details/Grid/TaskInstances.tsx @@ -47,6 +47,7 @@ export const TaskInstances = ({ nodes, runId, taskInstances }: Props) => { ; }; -export const NavTabs = ({ keepSearch, tabs }: Props) => { - const [searchParams] = useSearchParams(); - +export const NavTabs = ({ tabs }: Props) => { const containerRef = useRef(null); const containerWidth = useContainerWidth(containerRef); @@ -42,8 +39,6 @@ export const NavTabs = ({ keepSearch, tabs }: Props) => { title={label} to={{ pathname: value, - // Preserve search params when navigating - search: keepSearch ? searchParams.toString() : undefined, }} > {({ isActive }) => ( diff --git a/airflow/ui/src/pages/Dag/Backfills/Backfills.tsx b/airflow/ui/src/pages/Dag/Backfills/Backfills.tsx new file mode 100644 index 0000000000000..30dc9649b2350 --- /dev/null +++ b/airflow/ui/src/pages/Dag/Backfills/Backfills.tsx @@ -0,0 +1,135 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Heading, Text } from "@chakra-ui/react"; +import type { ColumnDef } from "@tanstack/react-table"; +import { useParams } from "react-router-dom"; + +import { useBackfillServiceListBackfills } from "openapi/queries"; +import type { BackfillResponse } from "openapi/requests/types.gen"; +import { DataTable } from "src/components/DataTable"; +import { useTableURLState } from "src/components/DataTable/useTableUrlState"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import Time from "src/components/Time"; +import { reprocessBehaviors } from "src/constants/reprocessBehaviourParams"; +import { getDuration, pluralize } from "src/utils"; + +const columns: Array> = [ + { + accessorKey: "reprocess_behavior", + cell: ({ row }) => ( + + { + reprocessBehaviors.find((rb: { value: string }) => rb.value === row.original.reprocess_behavior) + ?.label + } + + ), + enableSorting: false, + header: "Reprocess Behavior", + }, + { + accessorKey: "max_active_runs", + enableSorting: false, + header: "Max Active Runs", + }, + { + accessorKey: "created_at", + cell: ({ row }) => ( + + + ), + enableSorting: false, + header: "Created at", + }, + { + accessorKey: "completed_at", + cell: ({ row }) => ( + + + ), + enableSorting: false, + header: "Completed at", + }, + { + accessorKey: "date_from", + cell: ({ row }) => ( + + + ), + enableSorting: false, + header: "From", + }, + { + accessorKey: "date_to", + cell: ({ row }) => ( + + + ), + enableSorting: false, + header: "To", + }, + { + accessorKey: "duration", + cell: ({ row }) => ( + + {row.original.completed_at === null + ? "" + : `${getDuration(row.original.created_at, row.original.completed_at)}s`} + + ), + enableSorting: false, + header: "Duration", + }, +]; + +export const Backfills = () => { + const { setTableURLState, tableURLState } = useTableURLState(); + + const { pagination } = tableURLState; + + const { dagId = "" } = useParams(); + + const { data, error, isFetching, isLoading } = useBackfillServiceListBackfills({ + dagId, + limit: pagination.pageSize, + offset: pagination.pageIndex * pagination.pageSize, + }); + + return ( + + + + {pluralize("Backfill", data ? data.total_entries : 0)} + + + + ); +}; diff --git a/airflow/ui/src/pages/Dag/Backfills/index.ts b/airflow/ui/src/pages/Dag/Backfills/index.ts new file mode 100644 index 0000000000000..bf74d95d55056 --- /dev/null +++ b/airflow/ui/src/pages/Dag/Backfills/index.ts @@ -0,0 +1,20 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export { Backfills } from "./Backfills"; diff --git a/airflow/ui/src/pages/Dag/Dag.tsx b/airflow/ui/src/pages/Dag/Dag.tsx index 4f5372689ab27..da86ada2c93d3 100644 --- a/airflow/ui/src/pages/Dag/Dag.tsx +++ b/airflow/ui/src/pages/Dag/Dag.tsx @@ -17,7 +17,7 @@ * under the License. */ import { ReactFlowProvider } from "@xyflow/react"; -import { FiBarChart, FiCode } from "react-icons/fi"; +import { FiBarChart, FiCode, FiRotateCcw } from "react-icons/fi"; import { LuChartColumn } from "react-icons/lu"; import { MdOutlineEventNote } from "react-icons/md"; import { useParams } from "react-router-dom"; @@ -33,6 +33,7 @@ const tabs = [ { icon: , label: "Overview", value: "" }, { icon: , label: "Runs", value: "runs" }, { icon: , label: "Tasks", value: "tasks" }, + { icon: , label: "Backfills", value: "backfills" }, { icon: , label: "Events", value: "events" }, { icon: , label: "Code", value: "code" }, ]; diff --git a/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvent.tsx b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvent.tsx index 9acc43e8d311e..e5bf931d17382 100644 --- a/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvent.tsx +++ b/airflow/ui/src/pages/Dashboard/HistoricalMetrics/AssetEvent.tsx @@ -58,7 +58,7 @@ export const AssetEvent = ({ event }: { readonly event: AssetEventResponse }) => Source: {source === "" ? ( -1 ? `/mapped/${event.source_map_index}` : ""}`} > {event.source_dag_id} diff --git a/airflow/ui/src/pages/MappedTaskInstance/Header.tsx b/airflow/ui/src/pages/MappedTaskInstance/Header.tsx new file mode 100644 index 0000000000000..9396802a2d845 --- /dev/null +++ b/airflow/ui/src/pages/MappedTaskInstance/Header.tsx @@ -0,0 +1,64 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box } from "@chakra-ui/react"; +import type { ReactNode } from "react"; +import { MdOutlineTask } from "react-icons/md"; + +import type { GridTaskInstanceSummary } from "openapi/requests/types.gen"; +import { HeaderCard } from "src/components/HeaderCard"; +import Time from "src/components/Time"; +import { getDuration } from "src/utils"; + +export const Header = ({ + isRefreshing, + taskInstance, +}: { + readonly isRefreshing?: boolean; + readonly taskInstance: GridTaskInstanceSummary; +}) => { + const entries: Array<{ label: string; value: number | ReactNode | string }> = []; + + if (taskInstance.child_states !== null) { + Object.entries(taskInstance.child_states).forEach(([state, count]) => { + if (count > 0) { + entries.push({ label: `Total ${state}`, value: count }); + } + }); + } + const stats = [ + { label: "Task Count", value: taskInstance.task_count }, + ...entries, + { label: "Start", value: