Skip to content

Conversation

@seanghaeli
Copy link
Contributor

The MwaaTaskSensor waits for the completion of a DAG task instance in an MWAA environment. This PR includes an implementation with unit tests, system tests, and docs. Similar to MwaaDagRunSensor

Also modified system test to have MwaaTriggerDagRunOperator set to deferrable=True. This tests the MwaaTaskSensor and MwaaDagRunSensor sensors during execution of DAG Run rather than only afterwards.

@ramitkataria
Copy link
Contributor

This tests the MwaaTaskSensor and MwaaDagRunSensor sensors during execution of DAG Run rather than only afterwards

Wouldn't it still wait for the dag run to complete? I think in this case the waiting would always be in deferrable mode instead of using the config value for operators.default_deferrable which would probably be the preferred method so that we can test both cases by just changing the config value, without having to modify the code

If we want to test the sensor during execution, we could run the dag again in another task before the sensor task but I'm not sure if we want to be that exhaustive in system tests

@seanghaeli
Copy link
Contributor Author

seanghaeli commented Jun 18, 2025

Wouldn't it still wait for the dag run to complete?

@ramitkataria With MwaaTriggerDagRunOperator's deferrable=True, wouldn't it proceed to the task sensor without waiting for the dag run to be done?

@ramitkataria
Copy link
Contributor

Wouldn't it still wait for the dag run to complete?

@ramitkataria With MwaaTriggerDagRunOperator's deferrable=True, wouldn't it proceed to the task sensor without waiting for the dag run to be done?

Also discussed offline but in short, the sensor task would still wait for this task because the sensor task is set to depend on this task since they're in a chain

… adjust the default value of in base class to .

- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.
@seanghaeli
Copy link
Contributor Author

I see that the commit message is rendering weird above so I'll rewrite it here for clarity:

  • Comply with PR Rds Operator pass custom conn_id to superclass #51196: explicitly pass aws_conn_id to its superclass, and adjust the default value of aws_conn_id in base class to aws_default.
  • Add defensive test around adding more task instance states to keep in_progress_states of the MwaaTaskCompletedTrigger up to date.
  • Fix issue where waiter_delay of the MwaaTaskSensor derives to float instead of int type.
  • Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
  • Make external_dag_run_id an optional parameter, where it defaults to the latest dag run.
  • Externally fetch the task ID variable.
  • Test the sensor while a DAG Run is still in progress.

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.
@seanghaeli
Copy link
Contributor Author

The two threads still in need of confirmation are: #51719 (comment) and #51719 (comment)

seanghaeli added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jul 23, 2025
- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.
seanghaeli added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jul 23, 2025
- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.
seanghaeli added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jul 23, 2025
- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.
@vincbeck
Copy link
Contributor

Bad rebase @seanghaeli I think. You might want to undo it

@seanghaeli seanghaeli force-pushed the ghaeli/mwaa-task-sensor branch from 5b03461 to a9fcc58 Compare July 23, 2025 17:41
@seanghaeli
Copy link
Contributor Author

Bad rebase @seanghaeli I think. You might want to undo it

Thanks good point. Reverted but can't untag the extra reviewers it pinged. Sorry all!

Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some mypy failures to take care of though

@eladkal eladkal dismissed their stale review August 1, 2025 16:33

stale

@vincbeck vincbeck merged commit 5b7de62 into apache:main Aug 5, 2025
75 checks passed
@vincbeck vincbeck deleted the ghaeli/mwaa-task-sensor branch August 5, 2025 17:38
HsiuChuanHsu pushed a commit to HsiuChuanHsu/airflow that referenced this pull request Aug 5, 2025
* Add MwaaTaskSensor to Amazon Provider Package

* include pre-commit hooks

* - Comply with PR apache#51196: explicitly pass  to its superclass, and adjust the default value of  in base class to .
- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.

* documentation update

* Response to PR apache#51719 comments

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.

* update tests

* Remove duplicate pointer to mwaatasksensor docs

* merge apache#53000

* Fix integration tests

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* remove State file from PR

* Remove hard coding deferrable property

* Remove unnecessary execute_complete function in sensor and instead use end_from_trigger

* Correctly use end_from_trigger attribute

* Correctly use end_from_trigger attibute for both dag run sensor and task sensor

* Remove unnecessary import
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 7, 2025
* Add MwaaTaskSensor to Amazon Provider Package

* include pre-commit hooks

* - Comply with PR apache#51196: explicitly pass  to its superclass, and adjust the default value of  in base class to .
- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.

* documentation update

* Response to PR apache#51719 comments

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.

* update tests

* Remove duplicate pointer to mwaatasksensor docs

* merge apache#53000

* Fix integration tests

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* remove State file from PR

* Remove hard coding deferrable property

* Remove unnecessary execute_complete function in sensor and instead use end_from_trigger

* Correctly use end_from_trigger attribute

* Correctly use end_from_trigger attibute for both dag run sensor and task sensor

* Remove unnecessary import
fweilun pushed a commit to fweilun/airflow that referenced this pull request Aug 11, 2025
* Add MwaaTaskSensor to Amazon Provider Package

* include pre-commit hooks

* - Comply with PR apache#51196: explicitly pass  to its superclass, and adjust the default value of  in base class to .
- Add defensive test around adding more task instance states to keep  of the MwaaTaskCompletedTrigger up to date.
- Fix issue where  of the MwaaTaskSensor derives to  instead of  type.
- Modify documentation to clearly indicate that the MwaaTaskSensor is meant to sense tasks across different MWAA environments.
- Make  an optional parameter, where it defaults to the latest dag run.
- Externally fetch the task ID variable.
- Test the sensor while a DAG Run is still in progress.

* documentation update

* Response to PR apache#51719 comments

- Brought UPSTREAM_FAILED to a Terminal Task Instance State instead of an Intermediate State.
- Added REMOVED to the list of successful terminal task instance states.
- Iterate programmatically through successful, failure, and in progress states instead of hard-coding.

* update tests

* Remove duplicate pointer to mwaatasksensor docs

* merge apache#53000

* Fix integration tests

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* removed unnecessary defensive tests for trigger acceptor states after apache#53000 merge

* remove State file from PR

* Remove hard coding deferrable property

* Remove unnecessary execute_complete function in sensor and instead use end_from_trigger

* Correctly use end_from_trigger attribute

* Correctly use end_from_trigger attibute for both dag run sensor and task sensor

* Remove unnecessary import
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants