Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show custom instance names for a mapped task in UI #36797

Merged
merged 28 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5ec12b7
draft for map_index_template to overwrite map_index in the UI
RNHTTR Jan 15, 2024
4d49f79
add rendered_map_index to TI table based on map_index_template
RNHTTR Jan 20, 2024
b026f33
address comments
RNHTTR Jan 23, 2024
f2d853e
rebase
RNHTTR Jan 23, 2024
4d7ac49
rebase
RNHTTR Jan 23, 2024
892d2a4
add tests
RNHTTR Jan 26, 2024
dc63d1a
fix static checks
RNHTTR Jan 27, 2024
cb0c85b
fixing some tests
RNHTTR Jan 28, 2024
9a83d40
handle race condition between context update and assigning values to …
RNHTTR Feb 7, 2024
2aeb032
add docs example
RNHTTR Feb 7, 2024
7ea351f
remove rogue file
RNHTTR Feb 8, 2024
16734c5
remove rogue file; fix failing test
RNHTTR Feb 8, 2024
fd4269a
Merge branch 'main' into custom-mapped-task-indices
uranusjr Feb 21, 2024
b4ee1fb
Fix task instance dump view test
uranusjr Feb 21, 2024
67804c9
Add rendered_map_index to REST API
uranusjr Feb 21, 2024
dfdf8a7
Add map_index_template to serialization format
uranusjr Feb 21, 2024
6619c58
Fix map_index_template use in classic partial
uranusjr Feb 21, 2024
ea5a932
Fix MappedOperator map_index_template serde
uranusjr Feb 21, 2024
38f77df
Fix refresh_from_db entry in test
uranusjr Feb 21, 2024
bbf78dc
Simplify expanding code
uranusjr Feb 21, 2024
d85c2f0
More documentation on map_index_template
uranusjr Feb 21, 2024
1e8f3f5
reflect mapped ti label in UI
RNHTTR Feb 21, 2024
025573f
remove console.log
RNHTTR Feb 21, 2024
00e45a4
Update airflow/www/static/js/dag/details/taskInstance/MappedInstances…
RNHTTR Feb 22, 2024
2779f68
render custom map index in other locations
RNHTTR Feb 22, 2024
12ad5d7
Merge branch 'main' into custom-mapped-task-indices
RNHTTR Feb 22, 2024
3984384
fix UI static checks
RNHTTR Feb 22, 2024
e44fb3f
Merge branch 'main' into custom-mapped-task-indices
RNHTTR Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3582,6 +3582,13 @@ components:
type: string
sla_miss:
$ref: "#/components/schemas/SLAMiss"
rendered_map_index:
description: |
Rendered name of an expanded task instance, if the task is mapped.

*New in version 2.9.0*
type: string
nullable: true
rendered_fields:
description: |
JSON object describing rendered fields.
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Meta:
executor_config = auto_field()
note = auto_field()
sla_miss = fields.Nested(SlaMissSchema, dump_default=None)
rendered_map_index = auto_field()
rendered_fields = JsonObjectField(dump_default={})
trigger = fields.Nested(TriggerSchema)
triggerer_job = fields.Nested(JobSchema)
Expand Down
1 change: 1 addition & 0 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ def _expand(self, expand_input: ExpandInput, *, strict: bool) -> XComArg:
expand_input=EXPAND_INPUT_EMPTY, # Don't use this; mapped values go to op_kwargs_expand_input.
partial_kwargs=partial_kwargs,
task_id=task_id,
map_index_template=partial_kwargs.pop("map_index_template", None),
params=partial_params,
deps=MappedOperator.deps_for(self.operator_class),
operator_extra_links=self.operator_class.operator_extra_links,
Expand Down
5 changes: 5 additions & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def partial(
priority_weight: int | ArgNotSet = NOTSET,
weight_rule: str | ArgNotSet = NOTSET,
sla: timedelta | None | ArgNotSet = NOTSET,
map_index_template: str | None | ArgNotSet = NOTSET,
max_active_tis_per_dag: int | None | ArgNotSet = NOTSET,
max_active_tis_per_dagrun: int | None | ArgNotSet = NOTSET,
on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -289,6 +290,7 @@ def partial(
"dag": dag,
"task_group": task_group,
"task_id": task_id,
"map_index_template": map_index_template,
"start_date": start_date,
"end_date": end_date,
"owner": owner,
Expand Down Expand Up @@ -781,6 +783,7 @@ def __init__(
resources: dict[str, Any] | None = None,
run_as_user: str | None = None,
task_concurrency: int | None = None,
map_index_template: str | None = None,
RNHTTR marked this conversation as resolved.
Show resolved Hide resolved
max_active_tis_per_dag: int | None = None,
max_active_tis_per_dagrun: int | None = None,
executor_config: dict | None = None,
Expand Down Expand Up @@ -933,6 +936,7 @@ def __init__(
self.max_active_tis_per_dag: int | None = max_active_tis_per_dag
self.max_active_tis_per_dagrun: int | None = max_active_tis_per_dagrun
self.do_xcom_push: bool = do_xcom_push
self.map_index_template: str | None = map_index_template
self.multiple_outputs: bool = multiple_outputs

self.doc_md = doc_md
Expand Down Expand Up @@ -1572,6 +1576,7 @@ def get_serialized_fields(cls):
"is_setup",
"is_teardown",
"on_failure_fail_dagrun",
"map_index_template",
}
)
DagContext.pop_context_managed_dag()
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def _expand(self, expand_input: ExpandInput, *, strict: bool) -> MappedOperator:
expand_input=expand_input,
partial_kwargs=partial_kwargs,
task_id=task_id,
map_index_template=partial_kwargs.pop("map_index_template", None),
params=self.params,
deps=MappedOperator.deps_for(self.operator_class),
operator_extra_links=self.operator_class.operator_extra_links,
Expand Down Expand Up @@ -280,6 +281,7 @@ class MappedOperator(AbstractOperator):
end_date: pendulum.DateTime | None
upstream_task_ids: set[str] = attr.ib(factory=set, init=False)
downstream_task_ids: set[str] = attr.ib(factory=set, init=False)
map_index_template: str | None

_disallow_kwargs_override: bool
"""Whether execution fails if ``expand_input`` has duplicates to ``partial_kwargs``.
Expand Down
21 changes: 18 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ def get_triggering_events() -> dict[str, list[DatasetEvent | DatasetEventPydanti
"inlets": task.inlets,
"logical_date": logical_date,
"macros": macros,
"map_index_template": task.map_index_template,
"next_ds": get_next_ds(),
"next_ds_nodash": get_next_ds_nodash(),
"next_execution_date": get_next_execution_date(),
Expand Down Expand Up @@ -1252,6 +1253,7 @@ class TaskInstance(Base, LoggingMixin):
pid = Column(Integer)
executor_config = Column(ExecutorConfigType(pickler=dill))
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow)
rendered_map_index = Column(String(64))

external_executor_id = Column(StringID())

Expand Down Expand Up @@ -2512,7 +2514,12 @@ def signal_handler(signum, frame):
self.task.params = context["params"]

with set_current_context(context):
task_orig = self.render_templates(context=context)
dag = self.task.get_dag()
if dag is not None:
jinja_env = dag.get_template_env()
else:
jinja_env = None
task_orig = self.render_templates(context=context, jinja_env=jinja_env)

if not test_mode:
rtif = RenderedTaskInstanceFields(ti=self, render_templates=False)
Expand Down Expand Up @@ -2547,10 +2554,16 @@ def signal_handler(signum, frame):
# Execute the task
with set_current_context(context):
result = self._execute_task(context, task_orig)

# Run post_execute callback
# Is never MappedOperator at this point
self.task.post_execute(context=context, result=result) # type: ignore[union-attr]

# DAG authors define map_index_template at the task level
if jinja_env is not None and (template := context.get("map_index_template")) is not None:
rendered_map_index = self.rendered_map_index = jinja_env.from_string(template).render(context)
self.log.info("Map index rendered as %s", rendered_map_index)

Stats.incr(f"operator_successes_{self.task.task_type}", tags=self.stats_tags)
# Same metric with tagging
Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": self.task.task_type})
Expand Down Expand Up @@ -2922,7 +2935,9 @@ def overwrite_params_with_dag_run_conf(self, params: dict, dag_run: DagRun):
self.log.debug("Updating task params (%s) with DagRun.conf (%s)", params, dag_run.conf)
params.update(dag_run.conf)

def render_templates(self, context: Context | None = None) -> Operator:
def render_templates(
self, context: Context | None = None, jinja_env: jinja2.Environment | None = None
) -> Operator:
"""Render templates in the operator fields.

If the task was originally mapped, this may replace ``self.task`` with
Expand All @@ -2937,7 +2952,7 @@ def render_templates(self, context: Context | None = None) -> Operator:
# unmapped BaseOperator created by this function! This is because the
# MappedOperator is useless for template rendering, and we need to be
# able to access the unmapped task instead.
original_task.render_template_fields(context)
original_task.render_template_fields(context, jinja_env)

return original_task

Expand Down
1 change: 1 addition & 0 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
"ds_nodash",
"expanded_ti_count",
"inlets",
"map_index_template",
"next_ds",
"next_ds_nodash",
"outlets",
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
pid: Optional[int]
executor_config: Any
updated_at: Optional[datetime]
rendered_map_index: Optional[str]
external_executor_id: Optional[str]
trigger_id: Optional[int]
trigger_timeout: Optional[datetime]
Expand Down
3 changes: 2 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@
"_log_config_logger_name": { "type": "string" },
"_is_mapped": { "const": true, "$comment": "only present when True" },
"expand_input": { "type": "object" },
"partial_kwargs": { "type": "object" }
"partial_kwargs": { "type": "object" },
"map_index_template": { "type": "string" }
},
"dependencies": {
"expand_input": ["partial_kwargs", "_is_mapped"],
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ def deserialize_operator(cls, encoded_op: dict[str, Any]) -> Operator:
task_group=None,
start_date=None,
end_date=None,
map_index_template=None,
RNHTTR marked this conversation as resolved.
Show resolved Hide resolved
disallow_kwargs_override=encoded_op["_disallow_kwargs_override"],
expand_input_attr=encoded_op["_expand_input_attr"],
)
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"inlets",
"logical_date",
"macros",
"map_index_template",
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
"next_ds",
"next_ds_nodash",
"next_execution_date",
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/context.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Context(TypedDict, total=False):
inlets: list
logical_date: DateTime
macros: Any
map_index_template: str
next_ds: str | None
next_ds_nodash: str | None
next_execution_date: DateTime | None
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/static/js/components/Table/Cells.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { getMetaValue } from "src/utils";
import { useContainerRef } from "src/context/containerRef";
import { SimpleStatus } from "src/dag/StatusBox";

interface CellProps {
export interface CellProps {
cell: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
value: any;
Expand Down
16 changes: 14 additions & 2 deletions airflow/www/static/js/dag/details/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
import { getDagRunLabel, getMetaValue, getTask } from "src/utils";
import useSelection from "src/dag/useSelection";
import Time from "src/components/Time";
import { useGridData } from "src/api";
import { useGridData, useTaskInstance } from "src/api";
import RunTypeIcon from "src/components/RunTypeIcon";

import BreadcrumbText from "./BreadcrumbText";
Expand All @@ -45,6 +45,15 @@ const Header = () => {
onSelect,
clearSelection,
} = useSelection();

const { data: taskInstance } = useTaskInstance({
dagId,
dagRunId: runId || "",
taskId: taskId || "",
mapIndex,
enabled: mapIndex !== undefined,
});

const dagRun = dagRuns.find((r) => r.runId === runId);

const group = getTask({ taskId, task: groups });
Expand Down Expand Up @@ -131,7 +140,10 @@ const Header = () => {
<BreadcrumbLink
_hover={isMappedTaskDetails ? { cursor: "default" } : undefined}
>
<BreadcrumbText label="Map Index" value={mapIndex} />
<BreadcrumbText
label="Map Index"
value={taskInstance?.renderedMapIndex || mapIndex}
/>
</BreadcrumbLink>
</BreadcrumbItem>
)}
Expand Down
6 changes: 6 additions & 0 deletions airflow/www/static/js/dag/details/taskInstance/Details.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ const Details = ({ gridInstance, taskInstance, group }: Props) => {
<Td>{taskInstance.mapIndex}</Td>
</Tr>
)}
{taskInstance?.renderedMapIndex !== undefined && (
<Tr>
<Td>Rendered Map Index</Td>
<Td>{taskInstance.renderedMapIndex}</Td>
</Tr>
)}
{!!taskInstance?.tryNumber && (
<Tr>
<Td>Try Number</Td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { StatusWithNotes } from "src/dag/StatusBox";
import { Table } from "src/components/Table";
import Time from "src/components/Time";
import { useOffsetTop } from "src/utils";
import type { CellProps } from "src/components/Table";

interface Props {
dagId: string;
Expand Down Expand Up @@ -69,6 +70,7 @@ const MappedInstances = ({ dagId, runId, taskId, onRowClicked }: Props) => {
() =>
taskInstances.map((mi) => ({
...mi,
renderedMapIndex: mi.renderedMapIndex,
state: (
<Flex alignItems="center">
<StatusWithNotes
Expand All @@ -94,6 +96,8 @@ const MappedInstances = ({ dagId, runId, taskId, onRowClicked }: Props) => {
{
Header: "Map Index",
accessor: "mapIndex",
Cell: ({ cell: { row } }: CellProps) =>
row.original.renderedMapIndex || row.original.mapIndex,
},
{
Header: "State",
Expand Down
6 changes: 6 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,12 @@ export interface components {
pid?: number | null;
executor_config?: string;
sla_miss?: components["schemas"]["SLAMiss"];
/**
* @description Rendered name of an expanded task instance, if the task is mapped.
*
* *New in version 2.9.0*
*/
rendered_map_index?: string | null;
/**
* @description JSON object describing rendered fields.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,44 @@ As well as a single parameter it is possible to pass multiple parameters to expa

This would result in the add task being called 6 times. Please note, however, that the order of expansion is not guaranteed.

Named mapping
-------------

By default, mapped tasks are assigned an integer index. It is possible to override the integer index for each mapped task in the Airflow UI with a name based on the task's input. This is done by providing a Jinja template for the task with ``map_index_template``. This template is rendered after each expanded task is executed using the task context. This means you can reference attributes on the task like this:

.. code-block:: python

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


# The two expanded task instances will be named "2024-01-01" and "2024-01-02".
SQLExecuteQueryOperator.partial(
...,
sql="SELECT * FROM data WHERE date = %(date)s",
map_index_template="""{{ task.parameters['date'] }}""",
).expand(
parameters=[{"date": "2024-01-01"}, {"date": "2024-01-02"}],
)

In the above example, the expanded task instances will be named "2024-01-01" and "2024-01-02". The names show up in the Airflow UI instead of "0" and "1", respectively.

Since the template is rendered after the main execution block, it is possible to also dynamically inject into the rendering context. This is useful when the logic to render a desirable name is difficult to express in the Jinja template syntax, particularly in a taskflow function. For example:

.. code-block:: python

from airflow.operators.python import get_current_context


@task(map_index_template="{{ my_variable }}")
def my_task(my_value: str):
context = get_current_context()
context["my_variable"] = my_value * 3
... # Normal execution...


# The task instances will be named "aaa" and "bbb".
my_task.expand(my_value=["a", "b"])

Mapping with non-TaskFlow operators
===================================

Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/templates-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Variable Type Description
``{{ run_id }}`` str The currently running :class:`~airflow.models.dagrun.DagRun` run ID.
``{{ dag_run }}`` DagRun The currently running :class:`~airflow.models.dagrun.DagRun`.
``{{ test_mode }}`` bool Whether the task instance was run by the ``airflow test`` CLI.
``{{ map_index_template }}`` None | str Template used to render the expanded task instance of a mapped task. Setting this value will be reflected in the rendered result.
``{{ expanded_ti_count }}`` int | ``None`` | Number of task instances that a mapped task was expanded into. If
| the current task is not mapped, this should be ``None``.
| Added in version 2.5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def test_mapped_task_instances(self, one_task_with_mapped_tis, session):
"queue": "default",
"queued_when": None,
"rendered_fields": {},
"rendered_map_index": None,
"sla_miss": None,
"start_date": "2020-01-01T00:00:00+00:00",
"state": "success",
Expand Down
Loading