-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for labelling DAG edges #15142
Conversation
I'm not a fan of this import for in user dags (they generally don't import from models currently) but I don't have an immediate other suggestion. (Just a comment on "user" import not code location) |
Yeah, I went trawling around some example DAGs to find what path might fit, and I saw a couple with |
To add to your Todo list: update the SerialisedDag representations to include this |
That's actually done already, or it wouldn't make it across to the UI. Unless there's extra work above and beyond just making it appear in the schema & json? |
Nope, I just missed it then! GitHub mobile is not the best at displaying PRs |
airflow/serialization/schema.json
Outdated
@@ -102,7 +102,8 @@ | |||
"_task_group": {"anyOf": [ | |||
{ "type": "null" }, | |||
{ "$ref": "#/definitions/task_group" } | |||
]} | |||
]}, | |||
"edge_info": { "$ref": "#/definitions/dict" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this could maybe be specified a bit tighter. WDYT @kaxil ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea similar to how we only store _downstream_task_ids
(not the _upstream_task_ids
) for a task and we can set upstream for the other when de-serializing. We do that to keep the size of serialized blog as small as possible.
airflow/airflow/serialization/serialized_objects.py
Lines 451 to 452 in 8cc8d11
if k == "_downstream_task_ids": | |
v = set(v) |
airflow/airflow/serialization/serialized_objects.py
Lines 666 to 668 in 8cc8d11
for k, v in encoded_dag.items(): | |
if k == "_downstream_task_ids": | |
v = set(v) |
airflow/airflow/serialization/serialized_objects.py
Lines 724 to 727 in 8cc8d11
for task_id in serializable_task.downstream_task_ids: | |
# Bypass set_upstream etc here - it does more than we want | |
# noqa: E501 # pylint: disable=protected-access | |
dag.task_dict[task_id]._upstream_task_ids.add(serializable_task.task_id) |
airflow/airflow/models/baseoperator.py
Lines 1365 to 1370 in 8cc8d11
cls.__serialized_fields = frozenset( | |
vars(BaseOperator(task_id='test')).keys() | |
- { | |
'inlets', | |
'outlets', | |
'_upstream_task_ids', |
We pay the price of hardcoding it but it is worth it as we can save MBs (which would be transmitted to and from the database) when number of tasks is huge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll add a slightly more compact representation. I can't save any actual data, since source-target-label is all that's stored, but we can save the outside wrapping with _type
etc. at least?
How about hiding the |
I'd not be a huge fan of that design for two reasons:
|
Can you give an example of a long description that has to use How about Though I am a bit hesitant to introduce another special operator for Airflow, I think it's still better rather than introducing a
|
I would argue that this fulfills a similar role as task groups - it's a primarily UI-focused feature, but it's also DAG metadata and should live in it. If it's not in the DAG, where would it go where it can appear both in the webserver view and in the output from I'm open to changing the user interface here - this is just a rough idea me and @ashb discussed - but I really don't like moving to just a plain I'm also not sure you could make this work:
|
I don’t think |
When doing operators for adding labels it would be good to check if they work with lineage operators ( @andrewgodwin would you mind elaborating more about the use case for the labels? In my understanding all the actions are performed by tasks and edges in DAGs represent only order (and sometimes data) relation. Adding a label to edge would suggest that some action is performed between tasks. Is it done by XCom or something? |
The idea for labels is merely to give the user a visual indication of what each edge means - this can be especially important, IMO, when using the branching operators or similar, on larger dags. It has no runtime effect. I don't think I'd even want to add anything with a runtime effect on edges; Airflow is designed around all the runtime info (priority, etc.) being at the Task level and I like that. The only things I could forsee adding is other informational data, such as a longer "description" field. The other option to achieve this is to take this, task groups, and any other informational-only parts of airflow and spin them off into a separate presentation layer somehow, and while I do quite like separating presentation and logic, I think that would be too unwieldy to actually be useful. |
6201e42
to
dffd368
Compare
airflow/serialization/schema.json
Outdated
@@ -102,7 +102,8 @@ | |||
"_task_group": {"anyOf": [ | |||
{ "type": "null" }, | |||
{ "$ref": "#/definitions/task_group" } | |||
]} | |||
]}, | |||
"edge_info": { "$ref": "#/definitions/dict" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea similar to how we only store _downstream_task_ids
(not the _upstream_task_ids
) for a task and we can set upstream for the other when de-serializing. We do that to keep the size of serialized blog as small as possible.
airflow/airflow/serialization/serialized_objects.py
Lines 451 to 452 in 8cc8d11
if k == "_downstream_task_ids": | |
v = set(v) |
airflow/airflow/serialization/serialized_objects.py
Lines 666 to 668 in 8cc8d11
for k, v in encoded_dag.items(): | |
if k == "_downstream_task_ids": | |
v = set(v) |
airflow/airflow/serialization/serialized_objects.py
Lines 724 to 727 in 8cc8d11
for task_id in serializable_task.downstream_task_ids: | |
# Bypass set_upstream etc here - it does more than we want | |
# noqa: E501 # pylint: disable=protected-access | |
dag.task_dict[task_id]._upstream_task_ids.add(serializable_task.task_id) |
airflow/airflow/models/baseoperator.py
Lines 1365 to 1370 in 8cc8d11
cls.__serialized_fields = frozenset( | |
vars(BaseOperator(task_id='test')).keys() | |
- { | |
'inlets', | |
'outlets', | |
'_upstream_task_ids', |
We pay the price of hardcoding it but it is worth it as we can save MBs (which would be transmitted to and from the database) when number of tasks is huge.
dffd368
to
7956f6c
Compare
7956f6c
to
743a31a
Compare
(ignore it being ready for review, I clicked that before remembering the serialisation needs fixing) |
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason. |
bbbdaff
to
b80d879
Compare
There are a couple of visual tweaks we should add when hovering tasks or statuses (e.g. fading out labels ). I'll provide you a solution as I was just modifying these interactions in #15257. |
As well as grouping tasks into groups, you can also label the edges between | ||
different tasks in the Graph View - this can be especially useful for branching | ||
areas of your DAG, so you can label the conditions under which certain branches | ||
might run. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Probably a screenshot in this doc might be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know how I didn't think about that one given it's an entirely visual feature! I'll get it in tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna merge this PR so we don't end up in CI hell, but @andrewgodwin please make another PR with this screenshot if you think that will help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewgodwin can you also add some Edge Labels to a few of the DAGs in airflow/example_dags/
in the same PR? It would be useful for this feature to be exposed during local development.
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
This adds support for putting human-readable labels on edges in the DAG between Tasks, as well as making the underlying framework for that generic enough that future metadata could be added if desired.
I opened #15298 to address the aforementioned visual enhancements needed for this feature. |
Part of the resolution of #15140 (paired with #15142) "Edge labels" are an existing concept in the library that powers the Graph view. This feature will be employed in Airflow the Airflow Graph view with #15142. This PR ensures that the labels are displayed properly when interacting with the Task/path/status highlighting features of the Graph. Primarily, it fades out the labels when not relevant.
I've opened #15310 to get the extra examples in. |
This adds support for putting human-readable labels on edges in the DAG between Tasks, as well as making the underlying framework for that generic enough that future metadata could be added if desired.
What's left to do:
It modifies both the GraphViz and the D3 renderers - example:
closes: #15140