-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP 72: Handling "deferrable" tasks in execution_api and task SDK #44241
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good start!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving pre-emptively as the code looks good but the tests needs a bit more work.
1de3561
to
1051fc5
Compare
Squashed commits into 1 and resolved conflicts. Doing one final pass now |
1051fc5
to
c2b6003
Compare
187c6ee
to
5bdb4e1
Compare
5bdb4e1
to
4bd9d05
Compare
…ache#44241) closes: apache#44137 Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
closes: #44137
This PR is trying to port the "deferral" logic from airflow 2 to the airflow 3 (execution api + task sdk)
Summary of changes:
Server side changes (execution api):
TIDeferredStatePayload
inti_update_state
-> covered by unit test:test_ti_update_state_to_deferred
a. Didn't piggy back on
ti.defer_task()
as it extracts the trigger out ofTaskDeferred
exception. It is much more expensive to send across multiple models likeTaskInstance
,TaskDeferred
,Trigger
instead of just the required minimal propertiesb.
returning
and not proceeding with query execution as we already do it above https://github.com/apache/airflow/pull/44241/files#diff-d44a72566870079ee943e24bac2af74fb84c426c54d210561a251549a7078ed7L129Client side changes (task sdk):
HTTP client:
Added a new function defer that sends a patch request to the
task-instances/{id}/state execution
api with payload:PatchTIToDeferred
Comms:
Defining a new data model to send a request to patch ti as "deferred" from task runner to supervisor:
PatchTIToDeferred
(Added to ToSupervisor)Supervisor:
_final_state
to support@property
final_state
which is final state of a TIa. Added a setter to set values for this final state for cases like deferred so that the finish is not called for tasks those aren't in terminal stage: https://github.com/apache/airflow/pull/44241/files#diff-c2651fdee1a25e091e2a9d4f937f8032ca3d289d0de76f38ed88aee5df0f880dL392-L394
finish()
whenfinal_state
is notTerminalTIState
handle_requests
to receive requests from task runner and forwarding the message to http client to call deferTaskRunner:
Task runner executes: ti.task.execute and raises
TaskDeferred
for deferral. This sends a request to supervisor usingSUPERVISOR_COMMS
How was this tested?
test_handle_requests
covers the supervisor + client side of things along with a mock of the message from task runnertest_ti_update_state_to_deferred
covers the scenario for execution APItest_run_deferred_basic
tests if the DAG raised a task exception and sent the right message across the SUPERVISOR_COMMS or not.Additional sanity tests
Testing the "task runner" entries and exits (client side)
Wrote a async operator DAG:
Breakpoint inside task_runner.py#run
Validated the exception entrypoint with all the variables to be set as required by running the test:
test_run_basic
Testing the DB state in the server side (execution API)
Used the test:
test_ti_update_state_to_deferred
Debugger inside
ti_update_state
methodValidated the
TaskDeferred
and related logicChecked state of the DB on execution
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.