Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run system tests using dag.test() #39176

Merged
merged 1 commit into from
Apr 24, 2024

Conversation

vincbeck
Copy link
Contributor

@vincbeck vincbeck commented Apr 22, 2024

Resolves #31826
Related and continuation of #36236

Deferrable operators cannot be run by DebugExecutor. DebugExecutor is the default executor when running system tests. The purpose of this PR is to no longer use DebugExecutor for system tests but dag.test(). See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/debug.html#testing-dags-with-dag-test


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Contributor

@syedahsn syedahsn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the solution is, but I believe the reason the test does not pass is because pytest, which is running the test, exits out once it receives an Exception, which is what happens when a task is deferred. We know that when a Task is Deferred, it raises the TaskDeferred Exception. We would need to catch that Exception, and process it differently.

Here's an example that might be useful: #37542

@potiuk
Copy link
Member

potiuk commented Apr 22, 2024

It's because System Tests use debug executor that has no triggerer/deferred support https://github.com/apache/airflow/blob/main/tests/system/conftest.py#L29

Proper way would likely be to change System Tests to use airflow dag test under the hood which was replacement for Debug Executor: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/debug.html#testing-dags-with-dag-test

That would even enable to remove DebugExecutor because this is the last thing it is used for.

@potiuk
Copy link
Member

potiuk commented Apr 22, 2024

I think dag test had recently (2.8?) got the suppor to run deferred tasks

@potiuk
Copy link
Member

potiuk commented Apr 22, 2024

@vincbeck
Copy link
Contributor Author

vincbeck commented Apr 23, 2024

I see! Stupid question: why dont we use the SequentialExecutor for system tests instead of DebugExecutor. The change would be even easier no?

EDIT: nevermind, your solution is even easier. Looking good.

@vincbeck vincbeck force-pushed the vincbeck/sys_test_deferrable branch 2 times, most recently from a4fb824 to 3003562 Compare April 23, 2024 15:10
@vincbeck
Copy link
Contributor Author

The problem I have now (I updated the code) is the DAG always succeed, even though when it should fail. I guess the test.debug() swallow any exception which makes the DAG always succed?

@potiuk
Copy link
Member

potiuk commented Apr 23, 2024

The problem I have now (I updated the code) is the DAG always succeed, even though when it should fail. I guess the test.debug() swallow any exception which makes the DAG always succed?

The dag.test() returns DagRun and that dag run state should reflect whether the dag run failed or not - but I think it should be tested in the context of System Tests - so I guess switching to use deferrables might be a good opportunity.

@vincbeck vincbeck force-pushed the vincbeck/sys_test_deferrable branch 2 times, most recently from 99a8acf to cf01df7 Compare April 23, 2024 18:37
@vincbeck vincbeck changed the title Enable triggerer in system tests Run system tests using dag.test() Apr 23, 2024
@vincbeck
Copy link
Contributor Author

Thanks @potiuk for the information! That helps. I have an implementation working, could you take a look?

@vincbeck vincbeck force-pushed the vincbeck/sys_test_deferrable branch from cf01df7 to 5e5c288 Compare April 23, 2024 18:50
@vincbeck
Copy link
Contributor Author

I ran all AWS system tests using this and they all succeeded. I also tested the negative case by "breaking" some system tests. All seems fine

@vincbeck vincbeck marked this pull request as ready for review April 23, 2024 20:46
@potiuk
Copy link
Member

potiuk commented Apr 24, 2024

Looks good then.

@potiuk potiuk merged commit fece17a into apache:main Apr 24, 2024
39 checks passed
@vincbeck vincbeck deleted the vincbeck/sys_test_deferrable branch April 24, 2024 13:32
@vincbeck
Copy link
Contributor Author

Question on this one though. From the doc it says, dag.test() does not use an executor at all. So it is not possible, somehow, to configure to use a specific executor. I had in mind to possibly run some system tests with different executor (e.g. ECSExecutor). How hard would that be?

@potiuk
Copy link
Member

potiuk commented Apr 24, 2024

It's a completely different mechanism - it just runs _run_raw_task for all tasks according to dependencies - look at the code.

If you run it with a different executor, what you really need you need to run Airflow Scheduler with the executor and you need to trigger the DAG (via APi or CLI). You should not really use Pytest for that. Previously backfill was used to run the dags (this is how Debug Executor worked) - but that was a feature of Debug Executor - if you have ECS executor, then having scheduler to run the DAG is probably much better.

@vincbeck
Copy link
Contributor Author

vincbeck commented Jun 10, 2024

Just for testing, I tried to run a system test with the backfill approach with AwsEcsExecutor and it seems to work (besides the issue I mentioned here). At least an ECS task is triggered and tries to run the system test. Do you still do not recommend this approach @potiuk?

        dag.clear(dag_run_state=State.QUEUED)
        dag.run(
            local=False,
            executor=AwsEcsExecutor(),
        )

Edit: nevermind, it does not work. I'll go with the scheduler option

@potiuk
Copy link
Member

potiuk commented Jun 11, 2024

Edit: nevermind, it does not work. I'll go with the scheduler option

Thought so :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a command line option to start a triggerer process with breeze
3 participants