Skip to content

Conversation

@gopidesupavan
Copy link
Member

closes: #47949

image

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@gopidesupavan
Copy link
Member Author

@kaxil for now i have added to Fix only DagStateTrigger. i have observed few issues with ExternalTaskSensor with WorkflowTrigger. while testing, date calcutlation needs to be changed here, https://github.com/apache/airflow/pull/48651/files#diff-cd0a81a3fe4e14f46d41d402eabcb273554245c7e676e549b8b84030a99d4870R255 . the logical_date is current dag, when we monitoring external task/dag that contians different logical date, so causing this queries failures.

Additionally here logical_date argument is not available in WorkflowTrigger, overall it needs some good amount of tetsing i feel will do it separate, may not be possible for RC

logical_dates=self._get_dttm_filter(context),

Atleast i would like to DagStateTrigger to be ready for review and merge for RC. so raised it here.

@gopidesupavan
Copy link
Member Author

Strange failure 🤔

  Provider pgvector is not excluded.
  warning: The direct dependency `types-protobuf` is unpinned. Consider setting a lower bound when using `--resolution lowest` to avoid using outdated versions.
    × Failed to download and build `pyspark==3.1.3`
    ├─▶ Failed to extract archive
    ├─▶ failed to unpack
    │   `/root/.cache/uv/sdists-v9/.tmpD0fTFO/pyspark-3.1.3/deps/jars/hive-exec-2.3.7-core.jar`
    ├─▶ failed to unpack `pyspark-3.1.3/deps/jars/hive-exec-2.3.7-core.jar` into
    │   `/root/.cache/uv/sdists-v9/.tmpD0fTFO/pyspark-3.1.3/deps/jars/hive-exec-2.3.7-core.jar`
    ├─▶ error decoding response body
    ├─▶ request or response body error
    ├─▶ error reading a body from connection
    ╰─▶ stream closed because of a broken pipe
    help: `pyspark` (v3.1.3) was included because `apache-airflow[all]`
          (v3.0.0.dev0) depends on `apache-airflow-providers-apache-spark`
          (v5.1.1) which depends on `pyspark>=3.1.3`
  Test failed with 1. Dumping logs

@gopidesupavan gopidesupavan force-pushed the fix-dagstate-trigger branch from 549ba35 to 29c0f4f Compare April 3, 2025 17:27
@ashb
Copy link
Member

ashb commented Apr 3, 2025

    ├─▶ request or response body error
    ├─▶ error reading a body from connection
    ╰─▶ stream closed because of a broken pipe

@gopidesupavan Known issue, those are flakey right now.

@gopidesupavan
Copy link
Member Author

    ├─▶ request or response body error
    ├─▶ error reading a body from connection
    ╰─▶ stream closed because of a broken pipe

@gopidesupavan Known issue, those are flakey right now.

ah okay thanks for notifying

@gopidesupavan
Copy link
Member Author

gopidesupavan commented Apr 3, 2025

@ashb not sure SUPERVISOR_COMMS.lock is working properly, i could see some RunTrigger messages coming. in the context file here https://github.com/apache/airflow/pull/48747/files#diff-c53fa19ab123562d2f40abb3fead96105362003d0da4accb44e4e80ccb3cb1a0R677

the issue is coming only when i trigger multiple runs continuously. looks like messages are mixing up?

image

@gopidesupavan
Copy link
Member Author

Captured it looks workload.RunTrigger information is coming sometimes.

image

@gopidesupavan
Copy link
Member Author

I have created task to fix the issue #48820.

@gopidesupavan gopidesupavan force-pushed the fix-dagstate-trigger branch from c0dc087 to e604527 Compare April 4, 2025 23:30
@gopidesupavan
Copy link
Member Author

i tried using async lock, but no luck, same errors coming, if there are any suggestions that would be really helpful i will try out.

@gopidesupavan
Copy link
Member Author

gopidesupavan commented Apr 5, 2025

Nice the issues has gone, Tested on top of this #48835 trigger change. Looking good now DagStateTrigger

image

@gopidesupavan
Copy link
Member Author

example dags:
Parent dag:

from __future__ import annotations

from airflow.decorators import dag, task
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator


@dag()
def test_parent_dag():

   t1 = TriggerDagRunOperator(
         task_id='test_trigger_dagrun_defferable',
         trigger_dag_id="test_child_dag",
         conf={"message": "Hello World"},
         allowed_states=["success"],
        deferrable=True,
        poke_interval=5
   )
   t1 >> EmptyOperator(task_id='empty1')


test_parent_dag()

Child dag:

from __future__ import annotations

from airflow.decorators import dag, task

@dag()
def test_child_dag():

    @task
    def my_task_one():
        print("Hello from child")

    @task
    def my_task_two():
        print("Hello from child")

    @task
    def my_task_three():
        print("Hello from child")

    my_task_one()
    my_task_two()
    my_task_three()


test_child_dag()

@gopidesupavan gopidesupavan force-pushed the fix-dagstate-trigger branch from e604527 to 90c9e0d Compare April 7, 2025 18:40
@gopidesupavan gopidesupavan requested a review from potiuk April 7, 2025 18:59
@gopidesupavan
Copy link
Member Author

This is ready for review..

@kaxil kaxil merged commit 294594c into apache:main Apr 8, 2025
67 checks passed
@gopidesupavan gopidesupavan deleted the fix-dagstate-trigger branch May 28, 2025 20:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support deferral mode for TriggerDagRunOperator with Task SDK

4 participants