diff --git a/src/prefect/engine.py b/src/prefect/engine.py index ad19704626a0..5595cdcec11a 100644 --- a/src/prefect/engine.py +++ b/src/prefect/engine.py @@ -1653,7 +1653,13 @@ def resolve_input(expr, context): # Do not allow uncompleted upstreams except failures when `allow_failure` has # been used if not state.is_completed() and not ( - isinstance(context.get("annotation"), allow_failure) and state.is_failed() + # TODO: Note that the contextual annotation here is only at the current level + # if `allow_failure` is used then another annotation is used, this will + # incorrectly evaulate to false — to resolve this, we must track all + # annotations wrapping the current expression but this is not yet + # implemented. + isinstance(context.get("annotation"), allow_failure) + and state.is_failed() ): raise UpstreamTaskError( f"Upstream task run '{state.state_details.task_run_id}' did not reach a 'COMPLETED' state."