Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,18 @@ def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
return ids

existing_ids = _collect_ids(merged_nodes)
historical_task_ids = session.scalars(
select(TaskInstance.task_id)
historical_tasks = session.execute(
select(TaskInstance.task_id, TaskInstance.task_display_name)
.join(TaskInstance.dag_run)
.where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
.distinct()
)
for task_id in historical_task_ids:
for task_id, task_display_name in historical_tasks:
if task_id not in existing_ids:
merged_nodes.append(
{
"id": task_id,
"label": task_id,
"label": task_display_name,
"is_mapped": None,
"children": None,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from airflow.configuration import conf
from airflow.models.mappedoperator import MappedOperator, is_mapped
from airflow.sdk import TaskGroup
from airflow.serialization.serialized_objects import SerializedBaseOperator


Expand Down Expand Up @@ -90,15 +91,17 @@ def task_group_to_dict_grid(task_item_or_group, parent_group_is_mapped=False):
setup_teardown_type = "setup"
elif task.is_teardown is True:
setup_teardown_type = "teardown"
# we explicitly want the short task ID here, not the full doted notation if in a group
task_display_name = task.task_display_name if task.task_display_name != task.task_id else task.label
return {
"id": task.task_id,
"label": task.label,
"label": task_display_name,
"is_mapped": mapped,
"children": None,
"setup_teardown_type": setup_teardown_type,
}

task_group = task_item_or_group
task_group: TaskGroup = task_item_or_group
task_group_sort = get_task_group_children_getter()
mapped = is_mapped(task_group)
children = [
Expand All @@ -108,7 +111,7 @@ def task_group_to_dict_grid(task_item_or_group, parent_group_is_mapped=False):

return {
"id": task_group.group_id,
"label": task_group.label,
"label": task_group.group_display_name or task_group.label,
"is_mapped": mapped or None,
"children": children or None,
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ export const Header = ({
isRefreshing={isRefreshing}
state={taskInstance.state}
stats={stats}
subTitle={<Time datetime={taskInstance.start_date} />}
title={`${taskInstance.task_display_name}${taskInstance.map_index > -1 ? ` [${taskInstance.rendered_map_index ?? taskInstance.map_index}]` : ""}`}
/>
</Box>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@
"is_mapped": True,
"label": "mapped_task_group",
},
{"id": "task", "label": "task"},
{"id": "task", "label": "A Beautiful Task Name 🚀"},
{
"children": [
{
"children": [
{
"id": "task_group.inner_task_group.inner_task_group_sub_task",
"is_mapped": True,
"label": "inner_task_group_sub_task",
"label": "Inner Task Group Sub Task Label",
}
],
"id": "task_group.inner_task_group",
"label": "inner_task_group",
"label": "My Inner Task Group",
},
{"id": "task_group.mapped_task", "is_mapped": True, "label": "mapped_task"},
],
Expand All @@ -121,7 +121,7 @@ def setup(dag_maker, session=None):

# DAG 1
with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
task = EmptyOperator(task_id=TASK_ID)
task = EmptyOperator(task_id=TASK_ID, task_display_name="A Beautiful Task Name 🚀")

@task_group
def mapped_task_group(arg1):
Expand All @@ -131,8 +131,10 @@ def mapped_task_group(arg1):

with TaskGroup(group_id=TASK_GROUP_ID):
MockOperator.partial(task_id=MAPPED_TASK_ID).expand(arg1=["a", "b", "c", "d"])
with TaskGroup(group_id=INNER_TASK_GROUP):
MockOperator.partial(task_id=INNER_TASK_GROUP_SUB_TASK).expand(arg1=["a", "b"])
with TaskGroup(group_id=INNER_TASK_GROUP, group_display_name="My Inner Task Group"):
MockOperator.partial(
task_id=INNER_TASK_GROUP_SUB_TASK, task_display_name="Inner Task Group Sub Task Label"
).expand(arg1=["a", "b"])

# Mapped but never expanded. API should not crash, but count this as one no-status ti.
MockOperator.partial(task_id=MAPPED_TASK_ID_2).expand(arg1=task.output)
Expand Down Expand Up @@ -480,19 +482,19 @@ def test_get_dag_structure(self, session, test_client):
"is_mapped": True,
"label": "mapped_task_group",
},
{"id": "task", "label": "task"},
{"id": "task", "label": "A Beautiful Task Name 🚀"},
{
"children": [
{
"children": [
{
"id": "task_group.inner_task_group.inner_task_group_sub_task",
"is_mapped": True,
"label": "inner_task_group_sub_task",
"label": "Inner Task Group Sub Task Label",
}
],
"id": "task_group.inner_task_group",
"label": "inner_task_group",
"label": "My Inner Task Group",
},
{"id": "task_group.mapped_task", "is_mapped": True, "label": "mapped_task"},
],
Expand Down
Loading