Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task failed dependencies to details page. #38449

Merged
merged 8 commits into from
May 16, 2024

Conversation

tirkarthi
Copy link
Contributor

Show task failed dependencies when task is in scheduled or none state. None state helps to capture issues when the execution date is in future, depends_on_past fails with past task instance a failure etc.

closes: #38189
related: #38189

Scheduled state :

image

None state :

image

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers. area:webserver Webserver related Issues labels Mar 25, 2024
@tirkarthi
Copy link
Contributor Author

I am getting below error in tests. It seems using get_airflow_app().dag_bag.get_dag(ti.dag_id) causes this error but I am not sure why.

    def test_should_respond_200(
        self, task_instances, update_extras, payload, expected_ti_count, username, session
    ):
        self.create_task_instances(
            session,
            update_extras=update_extras,
            task_instances=task_instances,
        )
>       response = self.client.post(
            "/api/v1/dags/~/dagRuns/~/taskInstances/list",
            environ_overrides={"REMOTE_USER": username},
            json=payload,
        )

tests/api_connexion/endpoints/test_task_instance_endpoint.py:910: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.env/lib/python3.10/site-packages/werkzeug/test.py:1146: in post
    return self.open(*args, **kw)
.env/lib/python3.10/site-packages/flask/testing.py:238: in open
    response = super().open(
.env/lib/python3.10/site-packages/werkzeug/test.py:1095: in open
    response = self.run_wsgi_app(request.environ, buffered=buffered)
.env/lib/python3.10/site-packages/werkzeug/test.py:962: in run_wsgi_app
    rv = run_wsgi_app(self.application, environ, buffered=buffered)
.env/lib/python3.10/site-packages/werkzeug/test.py:1243: in run_wsgi_app
    app_rv = app(environ, start_response)
.env/lib/python3.10/site-packages/flask/app.py:2552: in __call__
    return self.wsgi_app(environ, start_response)
.env/lib/python3.10/site-packages/flask/app.py:2532: in wsgi_app
    response = self.handle_exception(e)
.env/lib/python3.10/site-packages/flask/app.py:2529: in wsgi_app
    response = self.full_dispatch_request()
.env/lib/python3.10/site-packages/flask/app.py:1825: in full_dispatch_request
    rv = self.handle_user_exception(e)
.env/lib/python3.10/site-packages/flask/app.py:1823: in full_dispatch_request
    rv = self.dispatch_request()
.env/lib/python3.10/site-packages/flask/app.py:1799: in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
.env/lib/python3.10/site-packages/connexion/decorators/decorator.py:68: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py:149: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/validation.py:196: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/response.py:112: in wrapper
    response = function(request)
.env/lib/python3.10/site-packages/connexion/decorators/parameter.py:120: in wrapper
    return function(**kwargs)
airflow/api_connexion/security.py:165: in decorated
    return _requires_access(
airflow/api_connexion/security.py:92: in _requires_access
    return func(*args, **kwargs)
airflow/utils/session.py:79: in wrapper
    return func(*args, session=session, **kwargs)
airflow/api_connexion/endpoints/task_instance_endpoint.py:437: in get_task_instances_batch
    return task_instance_collection_schema.dump(
.env/lib/python3.10/site-packages/marshmallow/schema.py:549: in dump
    result = self._serialize(processed_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:517: in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
.env/lib/python3.10/site-packages/marshmallow/fields.py:340: in serialize
    return self._serialize(value, attr, obj, **kwargs)
.env/lib/python3.10/site-packages/marshmallow/fields.py:774: in _serialize
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
.env/lib/python3.10/site-packages/marshmallow/fields.py:774: in <listcomp>
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
.env/lib/python3.10/site-packages/marshmallow/fields.py:643: in _serialize
    return schema.dump(nested_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:549: in dump
    result = self._serialize(processed_obj, many=many)
.env/lib/python3.10/site-packages/marshmallow/schema.py:517: in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
.env/lib/python3.10/site-packages/marshmallow/fields.py:332: in serialize
    value = self.get_value(obj, attr, accessor=accessor)
.env/lib/python3.10/site-packages/marshmallow/fields.py:263: in get_value
    return accessor_func(obj, check_key, default)
airflow/api_connexion/schemas/task_instance_schema.py:89: in get_attribute
    return get_value(obj[0], attr, default)
.env/lib/python3.10/site-packages/marshmallow/utils.py:276: in get_value
    return _get_value_for_key(obj, key, default)
.env/lib/python3.10/site-packages/marshmallow/utils.py:290: in _get_value_for_key
    return getattr(obj, key, default)
.env/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py:193: in __get__
    return inst.get(obj)
.env/lib/python3.10/site-packages/sqlalchemy/ext/associationproxy.py:575: in get
    target = getattr(obj, self.target_collection)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:487: in __get__
    return self.impl.get(state, dict_)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:959: in get
    value = self._fire_loader_callables(state, key, passive)
.env/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py:995: in _fire_loader_callables
    return self.callable_(state, passive)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <sqlalchemy.orm.strategies.LazyLoader object at 0x7f58c37a44a0>, state = <sqlalchemy.orm.state.InstanceState object at 0x7f58c2246680>
passive = symbol('PASSIVE_OFF'), loadopt = None, extra_criteria = ()

    def _load_for_state(self, state, passive, loadopt=None, extra_criteria=()):
        if not state.key and (
            (
                not self.parent_property.load_on_pending
                and not state._load_pending
            )
            or not state.session_id
        ):
            return attributes.ATTR_EMPTY
    
        pending = not state.key
        primary_key_identity = None
    
        use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
    
        if (not passive & attributes.SQL_OK and not use_get) or (
            not passive & attributes.NON_PERSISTENT_OK and pending
        ):
            return attributes.PASSIVE_NO_RESULT
    
        if (
            # we were given lazy="raise"
            self._raise_always
            # the no_raise history-related flag was not passed
            and not passive & attributes.NO_RAISE
            and (
                # if we are use_get and related_object_ok is disabled,
                # which means we are at most looking in the identity map
                # for history purposes or otherwise returning
                # PASSIVE_NO_RESULT, don't raise.  This is also a
                # history-related flag
                not use_get
                or passive & attributes.RELATED_OBJECT_OK
            )
        ):
    
            self._invoke_raise_load(state, passive, "raise")
    
        session = _state_session(state)
        if not session:
            if passive & attributes.NO_RAISE:
                return attributes.PASSIVE_NO_RESULT
    
>           raise orm_exc.DetachedInstanceError(
                "Parent instance %s is not bound to a Session; "
                "lazy load operation of attribute '%s' cannot proceed"
                % (orm_util.state_str(state), self.key)
            )
E           sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f58c2246530> is not bound to a Session; lazy load operation of attribute 'task_instance_note' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)

.env/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py:863: DetachedInstanceError
--------------------------------------------------------------------------- Captured stdout setup ---------------------------------------------------------------------------
========================= AIRFLOW ==========================

@tirkarthi
Copy link
Contributor Author

From what I understand some of the methods like dagbag.get_dag use create_session to get the session which invalidates the session attached to ti that has to be available as marshmallow serializes. I tried using inspect to get the session and pass it along so that it's reused. But this more feels like a workaround. I was also curious if this can become a separate endpoint like extra_links endpoint where similar logic of using dagbag is also done. Also the consideration that task_failed_dep looks like a costly operation that it needs to be done for taskinstance schema which is frequently accessed in the UI through autorefresh and other parts though the check for task_failed_deps done is only for tasks in None and failed state.

@bbovenzi
Copy link
Contributor

Also the consideration that task_failed_dep looks like a costly operation that it needs to be done for taskinstance schema which is frequently accessed in the UI through autorefresh and other parts though the check for task_failed_deps done is only for tasks in None and failed state.

Also, the task_failed_deps really only matters when the task is in scheduled state. So a separate endpoint is fine with me.

@tirkarthi tirkarthi force-pushed the task-failed-deps-ui branch from ea1537f to dec2a99 Compare March 26, 2024 15:41
@tirkarthi
Copy link
Contributor Author

IMO, making this check for None state also helps in catching issues like depends_on_past=True, execution date is in future etc. where task doesn't get to scheduled state. After the separate endpoint I also realized the implementation has more code than expected initially. I tried timing the check and it comes around 2-3 milliseconds with 10 dags so it's not as costly as I thought from getting dagbag from airflow app but just had the session based subtle issues. So I am open to keeping this as additional field to the existing endpoint like first commit or as a separate endpoint or check to be done only when passing a query parameter to existing task instance endpoint like check_dependencies=True.

image

Thanks

@jscheffl
Copy link
Contributor

I really like this extension in the view! Looking forward for fixes and removal of merge conflict... then would like to review.

@bbovenzi
Copy link
Contributor

bbovenzi commented May 2, 2024

@tirkarthi Could you still rebase this? It would be great to get this merged soon so we can migrate all the /task pages.

@tirkarthi tirkarthi force-pushed the task-failed-deps-ui branch from dec2a99 to 0efdc5d Compare May 3, 2024 06:04
@tirkarthi
Copy link
Contributor Author

@bbovenzi Sorry, I got held up at work. Fixed the code comments to use a spinner like other pages and an alert on API error. Did a basic test with a mapped task which also seems to be working as expected. Screenshots as below.

Please note that I have enabled this check for tasks in None state as well as there are cases like execution_date is greater than end_date that will be useful. In the old page I see this table always being shown so please let me know if this is needed always or task in scheduled state or task in None/scheduled state.

Spinner during API call

image

API failed :

image

Table rendering :

image

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it and it is a good next step to deprecate the leftover legacy task detail pages. Cool!

I found (besides small comments) a small glitch: The DAG dependecy panel is not included in the auto-refresh. I needed to hit F5 to get an update while status changed from "None" to "Scheduled" and "queued" - the page and panel was only refreshed when task went to "Success".

I think it would be reasonable to refresh the panel also together with the Auto Refresh cycle.

From manual testing - otherwise found no other glitch and LIKE it!

airflow/api_connexion/openapi/v1.yaml Outdated Show resolved Hide resolved
airflow/api_connexion/openapi/v1.yaml Outdated Show resolved Hide resolved
airflow/www/static/js/types/api-generated.ts Outdated Show resolved Hide resolved
airflow/www/static/js/types/api-generated.ts Outdated Show resolved Hide resolved
airflow/www/static/js/types/api-generated.ts Outdated Show resolved Hide resolved
airflow/www/static/js/types/api-generated.ts Outdated Show resolved Hide resolved
@tirkarthi tirkarthi force-pushed the task-failed-deps-ui branch from 0efdc5d to 03e253f Compare May 9, 2024 13:36
@tirkarthi
Copy link
Contributor Author

@jscheffl Thanks, updated the version number and regenerated the files. I have looked into auto refresh and found a pattern where auto refresh is done based on useAutoRefresh. The API useTaskFailedDependency doesn't have the taskinstance state based on which the refresh should happen. Once the task goes to queued state the showTaskSchedulingDependencies flag should return False which should hide the component in UI. @bbovenzi Any pointers here?

@bbovenzi
Copy link
Contributor

bbovenzi commented May 9, 2024

@jscheffl Thanks, updated the version number and regenerated the files. I have looked into auto refresh and found a pattern where auto refresh is done based on useAutoRefresh. The API useTaskFailedDependency doesn't have the taskinstance state based on which the refresh should happen. Once the task goes to queued state the showTaskSchedulingDependencies flag should return False which should hide the component in UI. @bbovenzi Any pointers here?

I think useTaskFailedDependency can just use the normal refresh logic: refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000,. The API hook doesn't need to know when the state changes.

@tirkarthi tirkarthi force-pushed the task-failed-deps-ui branch from 03e253f to f446844 Compare May 16, 2024 12:40
@tirkarthi
Copy link
Contributor Author

@bbovenzi Thanks, added auto refresh as suggested and rebased with latest main branch.

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a regression test - great - LIKE it!

@jscheffl jscheffl merged commit 448707c into apache:main May 16, 2024
41 checks passed
@eladkal eladkal added this to the Airflow 2.10.0 milestone May 20, 2024
@eladkal eladkal added the type:improvement Changelog: Improvements label May 20, 2024
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Jul 26, 2024
* Add task failed dependencies to details page.

* Fix tests.

* Explicitly pass attached session for reuse so that the session is available during serialization.

* Use separate endpoint for task failed dependencies.

* Fix conditional.

* Add spinner during loading and handle error scenario.

* Update version number in API.

* Auto refresh dependencies.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:UI Related to UI/UX. For Frontend Developers. area:webserver Webserver related Issues type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Show task_failed_deps on for scheduled tasks in Task Instance Details panel
4 participants