Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Deprecate dagrun conf and add params argument for the different APIs #29174

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion airflow/api/client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ def __init__(self, api_base_url, auth=None, session: httpx.Client | None = None)
if auth:
self._session.auth = auth

def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True):
def trigger_dag(
self, dag_id, run_id=None, conf=None, params=None, execution_date=None, replace_microseconds=True
):
"""Create a dag run for the specified dag.

:param dag_id:
:param run_id:
:param conf:
:param: params:
:param execution_date:
:param replace_microseconds:
:return:
Expand Down
6 changes: 5 additions & 1 deletion airflow/api/client/json_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ def _request(self, url: str, json=None, method: str = "GET") -> dict:
raise OSError(data.get("error", "Server error"))
return resp.json()

def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True):
def trigger_dag(
self, dag_id, run_id=None, conf=None, params=None, execution_date=None, replace_microseconds=True
):
"""Trigger a DAG run.

:param dag_id: The ID of the DAG to trigger.
:param run_id: The ID of the DAG run to create. If not provided, a default ID will be generated.
:param conf: A dictionary containing configuration data to pass to the DAG run.
:param params: A dictionary containing configuration data to pass to the DAG params.
:param execution_date: The execution date for the DAG run, in the format "YYYY-MM-DDTHH:MM:SS".
:param replace_microseconds: Whether to replace microseconds in the execution date with zeros.
:return: A message indicating the status of the DAG run trigger.
Expand All @@ -69,6 +72,7 @@ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, repla
data = {
"run_id": run_id,
"conf": conf,
"params": params,
"execution_date": execution_date,
"replace_microseconds": replace_microseconds,
}
Expand Down
5 changes: 4 additions & 1 deletion airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
class Client(api_client.Client):
"""Local API client implementation."""

def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True):
def trigger_dag(
self, dag_id, run_id=None, conf=None, params=None, execution_date=None, replace_microseconds=True
):
dag_run = trigger_dag.trigger_dag(
dag_id=dag_id,
run_id=run_id,
conf=conf,
params=params,
execution_date=execution_date,
replace_microseconds=replace_microseconds,
)
Expand Down
8 changes: 8 additions & 0 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def _trigger_dag(
dag_bag: DagBag,
run_id: str | None = None,
conf: dict | str | None = None,
params: dict | str | None = None,
execution_date: datetime | None = None,
replace_microseconds: bool = True,
) -> list[DagRun | None]:
Expand All @@ -42,6 +43,7 @@ def _trigger_dag(
:param dag_bag: DAG Bag model
:param run_id: ID of the dag_run
:param conf: configuration
:param params: dagrun params
:param execution_date: date of execution
:param replace_microseconds: whether microseconds should be zeroed
:return: list of triggered dags
Expand Down Expand Up @@ -80,6 +82,8 @@ def _trigger_dag(
run_conf = None
if conf:
run_conf = conf if isinstance(conf, dict) else json.loads(conf)
if params:
params = params if isinstance(params, dict) else json.loads(params)

dag_runs = []
dags_to_run = [dag] + dag.subdags
Expand All @@ -89,6 +93,7 @@ def _trigger_dag(
execution_date=execution_date,
state=DagRunState.QUEUED,
conf=run_conf,
params=params,
external_trigger=True,
dag_hash=dag_bag.dags_hash.get(dag_id),
data_interval=data_interval,
Expand All @@ -102,6 +107,7 @@ def trigger_dag(
dag_id: str,
run_id: str | None = None,
conf: dict | str | None = None,
params: dict | str | None = None,
execution_date: datetime | None = None,
replace_microseconds: bool = True,
) -> DagRun | None:
Expand All @@ -110,6 +116,7 @@ def trigger_dag(
:param dag_id: DAG ID
:param run_id: ID of the dag_run
:param conf: configuration
:param params: dagrun params
:param execution_date: date of execution
:param replace_microseconds: whether microseconds should be zeroed
:return: first dag run triggered - even if more than one Dag Runs were triggered or None
Expand All @@ -124,6 +131,7 @@ def trigger_dag(
dag_bag=dagbag,
run_id=run_id,
conf=conf,
params=params,
execution_date=execution_date,
replace_microseconds=replace_microseconds,
)
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def _fetch_dag_runs(
"updated_at",
"external_trigger",
"conf",
"params",
]
query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs)
return query.offset(offset).limit(limit).all(), total_entries
Expand Down Expand Up @@ -340,6 +341,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
params=post_body.get("params"),
external_trigger=True,
dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
session=session,
Expand Down
11 changes: 11 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2809,12 +2809,23 @@ components:
default: true
readOnly: true
conf:
type: object
description: |
JSON object describing additional configuration parameters.
The value of this field can be set only when creating the object. If you try to modify the
field of an existing object, the request fails with an BAD_REQUEST error.

*Deprecated since version 2.6.0*: Use 'params' instead.
deprecated: true
params:
type: object
description: |
JSON object describing additional configuration parameters.

The value of this field can be set only when creating the object. If you try to modify the
field of an existing object, the request fails with an BAD_REQUEST error.

*New in version 2.6.0*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a bit of update if you re-vitalize

note:
type: string
description: |
Expand Down
7 changes: 4 additions & 3 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from airflow.utils.types import DagRunType


class ConfObject(fields.Field):
"""The conf field."""
class ParamsObject(fields.Field):
"""The params field."""

def _serialize(self, value, attr, obj, **kwargs):
if not value:
Expand Down Expand Up @@ -68,7 +68,8 @@ class Meta:
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
conf = ParamsObject()
params = ParamsObject()
data_interval_start = auto_field(dump_only=True)
data_interval_end = auto_field(dump_only=True)
last_scheduling_decision = auto_field(dump_only=True)
Expand Down
14 changes: 13 additions & 1 deletion airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ def string_lower_type(val):
# trigger_dag
ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the DagRun's conf attribute")
ARG_PARAMS = Arg(("-p", "--params"), help="JSON string that gets pickled into the DagRun's params attribute")
ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
ARG_REPLACE_MICRO = Arg(
("--no-replace-microseconds",),
Expand Down Expand Up @@ -1179,7 +1180,16 @@ class GroupCommand(NamedTuple):
name="trigger",
help="Trigger a DAG run",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, ARG_VERBOSE, ARG_REPLACE_MICRO),
args=(
ARG_DAG_ID,
ARG_SUBDIR,
ARG_RUN_ID,
ARG_CONF,
ARG_PARAMS,
ARG_EXEC_DATE,
ARG_VERBOSE,
ARG_REPLACE_MICRO,
),
),
ActionCommand(
name="delete",
Expand Down Expand Up @@ -1273,6 +1283,7 @@ class GroupCommand(NamedTuple):
ARG_DRY_RUN,
ARG_VERBOSE,
ARG_CONF,
ARG_PARAMS,
ARG_RESET_DAG_RUN,
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
Expand Down Expand Up @@ -1306,6 +1317,7 @@ class GroupCommand(NamedTuple):
ARG_DAG_ID,
ARG_EXECUTION_DATE_OPTIONAL,
ARG_CONF,
ARG_PARAMS,
ARG_SUBDIR,
ARG_SHOW_DAGRUN,
ARG_IMGCAT_DAGRUN,
Expand Down
11 changes: 9 additions & 2 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def dag_backfill(args, dag=None):
run_conf = None
if args.conf:
run_conf = json.loads(args.conf)

params = None
if args.params:
params = json.loads(args.params)
for dag in dags:
if args.task_regex:
dag = dag.partial_subset(
Expand Down Expand Up @@ -124,6 +126,7 @@ def dag_backfill(args, dag=None):
delay_on_limit_secs=args.delay_on_limit,
verbose=args.verbose,
conf=run_conf,
params=params,
rerun_failed_tasks=args.rerun_failed_tasks,
run_backwards=args.run_backwards,
continue_on_failures=args.continue_on_failures,
Expand All @@ -146,6 +149,7 @@ def dag_trigger(args):
dag_id=args.dag_id,
run_id=args.run_id,
conf=args.conf,
params=args.params,
execution_date=args.exec_date,
replace_microseconds=args.replace_microseconds,
)
Expand Down Expand Up @@ -459,9 +463,12 @@ def dag_test(args, dag=None, session=None):
run_conf = json.loads(args.conf)
except ValueError as e:
raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}")
params = None
if args.params:
params = json.loads(args.params)
execution_date = args.execution_date or timezone.utcnow()
dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
dag.test(execution_date=execution_date, run_conf=run_conf, session=session)
dag.test(execution_date=execution_date, run_conf=run_conf, params=params, session=session)
show_dagrun = args.show_dagrun
imgcat = args.imgcat_dagrun
filename = args.save_dagrun
Expand Down
5 changes: 5 additions & 0 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def __init__(
delay_on_limit_secs=1.0,
verbose=False,
conf=None,
params=None,
rerun_failed_tasks=False,
run_backwards=False,
run_at_least_once=False,
Expand All @@ -137,6 +138,7 @@ def __init__(
:param delay_on_limit_secs:
:param verbose:
:param conf: a dictionary which user could pass k-v pairs for backfill
:param params: a dictionary which user could pass k-v pairs for backfill
:param rerun_failed_tasks: flag to whether to
auto rerun the failed task in backfill
:param run_backwards: Whether to process the dates from most to least recent
Expand All @@ -157,6 +159,7 @@ def __init__(
self.delay_on_limit_secs = delay_on_limit_secs
self.verbose = verbose
self.conf = conf
self.params = params
self.rerun_failed_tasks = rerun_failed_tasks
self.run_backwards = run_backwards
self.run_at_least_once = run_at_least_once
Expand Down Expand Up @@ -321,6 +324,7 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non
respect_dag_max_active_limit = False
# Fixes --conf overwrite for backfills with already existing DagRuns
run.conf = self.conf or {}
run.params = self.params or {}
# start_date is cleared for existing DagRuns
run.start_date = timezone.utcnow()
else:
Expand All @@ -339,6 +343,7 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non
external_trigger=False,
session=session,
conf=self.conf,
params=self.params,
run_type=DagRunType.BACKFILL_JOB,
creating_job_id=self.id,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add params column to DagRun

Revision ID: e8a79aa51603
Revises: 6abdffdd4815
Create Date: 2023-01-29 23:29:47.426929

"""
from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "e8a79aa51603"
down_revision = "6abdffdd4815"
branch_labels = None
depends_on = None
airflow_version = "2.6.0"


def upgrade():
"""Apply adding params column to DagRun"""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.add_column(sa.Column("params", sa.PickleType(), nullable=True))


def downgrade():
"""Revert adding params column to DagRun"""
with op.batch_alter_table("dag_run") as batch_op:
batch_op.drop_column("params")
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Increase length of user identifier columns in ``ab_user`` and ``ab_register_user`` tables

Revision ID: 98ae134e6fff
Revises: 6abdffdd4815
Revises: e8a79aa51603
Create Date: 2023-01-18 16:21:09.420958

"""
Expand All @@ -32,7 +32,7 @@

# revision identifiers, used by Alembic.
revision = "98ae134e6fff"
down_revision = "6abdffdd4815"
down_revision = "e8a79aa51603"
branch_labels = None
depends_on = None
airflow_version = "2.6.0"
Expand Down
Loading