Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Apr 7, 2025

The TriggerRunnerSupervisor previoulsy sent messges to the Triggerer directly
when a trigger needed to be created or cancelled, and this initially worked
fine, but when we later added the ability for Triggers to access Connections
and Variables this ended up causing problems.

The issue was that we broke one of the "rules" of comms, which is even
documented as such in airflow/sdk/exeuction_time/comms.py:

  • No messages are sent to task process except in response to a request.
    (This is because the task process will be running user's code, so we can't
    read from stdin until we enter our code, such as when requesting an XCom
    ) value etc.)

The net result of this was that since the supervisor sent messages
un-solicited, if things were timed badly, then the trigger would send a
request for a Conn, and the supervisor would reply with a trigger to create
etc.

This sort of instability was very likely the cause of our flakeyness in the
test_trigger_can_access_variables_connections_and_xcoms test.

This fixes things by making the TriggerRunnerSupervisor collect the triggers
to create/cancel in a deqeue in memory, and respond to the TriggerStateChanges
messages with TriggerStateSync with the info it would have sent on an
as-needed basis.

In order to make sure that we only send+receive one message at a time I have
swapped from the threading.lock to aiologic.Lock which allows for better
locking/waiting behaviour between threads (which is what Triggers will use via
the sync_to_async) and the main TriggerRunner.

Fixes #48820 in a different manner.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@ashb
Copy link
Member Author

ashb commented Apr 7, 2025

Tested by running 10 triggers concurrently that all try to access a Variable.

Screenshot 2025-04-07 at 16 27 30

The TriggerRunnerSupervisor previoulsy sent messges to the Triggerer directly
when a trigger needed to be created or cancelled, and this initially worked
fine, but when we later added the ability for Triggers to access Connections
and Variables this ended up causing problems.

The issue was that we broke one of the "rules" of comms, which is even
documents as such in `airflow/sdk/exeuction_time/comms.py`:

> * No messages are sent to task process except in response to a request.
>   (This is because the task process will be running user's code, so we can't
>   read from stdin until we enter our code, such as when requesting an XCom
)   value etc.)

The net result of this was that since the supervisor sent messages
un-solicited, if things were timed badly, then the trigger would send a
request for a Conn, and the supervisor would reply with a trigger to create
etc.

This sort of instability was very likely the cause of our flakeyness in the
`test_trigger_can_access_variables_connections_and_xcoms` test.

This fixes things by making the TriggerRunnerSupervisor collect the triggers
to create/cancel in a deqeue in memory, and respond to the TriggerStateChanges
messages with TriggerStateSync with the info it would have sent on an
as-needed basis.

In order to make sure that we only send+receive one message at a time I have
swapped from the `threading.lock` to `aiologic.Lock` which allows for better
locking/waiting behaviour between threads (which is what Triggers will use via
the `sync_to_async`) and the main TriggerRunner.
@ashb ashb force-pushed the trigger-comms-paths branch from 38c3c69 to 398de36 Compare April 7, 2025 15:44
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
@ashb
Copy link
Member Author

ashb commented Apr 7, 2025

Looks like this hasn't fixed the flakey test. Dang

@kaxil kaxil merged commit da78b39 into apache:main Apr 7, 2025
67 checks passed
@kaxil kaxil deleted the trigger-comms-paths branch April 7, 2025 17:01
@gopidesupavan
Copy link
Member

Nice thanks Ash, just verified it looks fine now. One thing the locks should be used in async context, so likely the xcom and variable methods updates required.

@ashb
Copy link
Member Author

ashb commented Apr 7, 2025

One thing the locks should be used in async context, so likely the xcom and variable methods updates required.

aiologic has locks that are valid in sync and async context, so the existing lock that Connection etc takes works fine.

simonprydden pushed a commit to simonprydden/airflow that referenced this pull request Apr 8, 2025
The TriggerRunnerSupervisor previoulsy sent messges to the Triggerer directly
when a trigger needed to be created or cancelled, and this initially worked
fine, but when we later added the ability for Triggers to access Connections
and Variables this ended up causing problems.

The issue was that we broke one of the "rules" of comms, which is even
documents as such in `airflow/sdk/exeuction_time/comms.py`:

> * No messages are sent to task process except in response to a request.
>   (This is because the task process will be running user's code, so we can't
>   read from stdin until we enter our code, such as when requesting an XCom
)   value etc.)

The net result of this was that since the supervisor sent messages
un-solicited, if things were timed badly, then the trigger would send a
request for a Conn, and the supervisor would reply with a trigger to create
etc.

This sort of instability was very likely the cause of our flakeyness in the
`test_trigger_can_access_variables_connections_and_xcoms` test.

This fixes things by making the TriggerRunnerSupervisor collect the triggers
to create/cancel in a deqeue in memory, and respond to the TriggerStateChanges
messages with TriggerStateSync with the info it would have sent on an
as-needed basis.

In order to make sure that we only send+receive one message at a time I have
swapped from the `threading.lock` to `aiologic.Lock` which allows for better
locking/waiting behaviour between threads (which is what Triggers will use via
the `sync_to_async`) and the main TriggerRunner.

Fixes apache#48820
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk area:Triggerer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Triggerer messages mixing up when the the tasks triggers continuously

3 participants