Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Handling skipped tasks in task_sdk #44786

Merged
merged 5 commits into from
Dec 10, 2024

Conversation

amoghrajesh
Copy link
Contributor

related: #44414

We already have support for handling terminal states from the task execution side as well as the task SDK client side. (almost).

This PR extends the task runner's run function to handle when an AirflowSkipException is thrown.

  1. Extended the task runner with the right msg when a AirflowSkipException is thrown
  2. Added a test in the test_handle_requests test of supervisor to show that no client call is made from handle_requests but is instead controlled by the wait function.
  3. Added an example DAG that throws AirflowSkipException and the task runner test that handles that.

Postman working example:

  1. Create a DAG run:
image
  1. Login into metadata DB and get the task_id for the task you want to skip
    image

  2. Once you have the task_id, send a patch request like this:

curl --location --request PATCH 'http://localhost:29091/execution/task-instances/0193ab1b-df6a-72bb-b888-04c541e6bf12/state' \
--data '{
    "state": "skipped",
    "end_date": "2024-10-31T12:00:00Z"
}'

image

  1. Check the Airflow UI if the task was marked as "skipped"
image

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh requested a review from ashb December 9, 2024 12:40
@amoghrajesh
Copy link
Contributor Author

This one is pretty straight forward. Merging it

@amoghrajesh amoghrajesh merged commit f92a845 into apache:main Dec 10, 2024
48 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-task-skipped branch December 10, 2024 10:24
kaxil added a commit to astronomer/airflow that referenced this pull request Dec 10, 2024
closes: apache#44805
Dependent on apache#44786

Every time when we port the different TI state handling in the task runner, it is usually followed by an integration test of sorts to test the end to end flow of whether that state is testable or not. For example:
1. For skipped state, we use the DAG https://github.com/apache/airflow/pull/44786/files#diff-cabbddd33130ce1a769412f5fc55dd23e4af4d0fa75f8981689daae769e0680dR1 and we test using the UT in task runner: https://github.com/apache/airflow/pull/44786/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R141-R161
2. For deferred state, we use the DAG: https://github.com/apache/airflow/pull/44241/files#diff-2152ed5392424771e27a69173b3c18caae717939719df8f5dbbbdfee5f9efd9bR1 and test it using UT in task runner: https://github.com/apache/airflow/pull/44241/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R93-R127

Due to this, when new ti states are added or tests for that matter, it eventually leads to a huge folder with DAGs under `task_sdk/tests/dags` which could soon get ever growing and unmanageable.

The solution is in two parts:
1. The first part would be the ability to create dynamic or in line dags which has been implemented using a DAGFactory kind of function:
```
def get_inline_dag(dag_id: str, tasks: BaseOperator) -> DAG:
    dag = DAG(
        dag_id=dag_id,
        default_args={"start_date": timezone.datetime(2024, 12, 3)},
    )
    setattr(tasks, "dag", dag)

    return dag

```
This function is capable of accepting `one` task as of now and creating a DAG out of it and returning the DAG object which should suffice our current testing needs, if there is a need, we can extend this function to support more than one tasks and their relationships.
Usage:
```
    task = PythonOperator(
        task_id="skip",
        python_callable=lambda: (_ for _ in ()).throw(
            AirflowSkipException("This task is being skipped intentionally."),
        ),
    )

    dag = get_inline_dag("basic_skipped", task)
```
The usage is as simple as creating any task from any operator and passing it down to this function.

2. Mocking the parse function using KGB spy_agency: https://pypi.org/project/kgb/
The idea here is to use a spy agency to substitute out the `parse` function with a mock parser that does a bare minimum of the actual parser. We choose spy_agency over the mock library for two reasons primarily:
a) With `spy_agency`, you can mock specific methods or functions without affecting the entire class or module.
b) Minimal dispruption and ease of use.

1. Replaced usage of all "actual" dags with in line dags in task runner tests which either do parsing or run.
2. Deleted two DAGs
3. Cannot remove the other two DAGs as they are tied to test_supervisor.py tests which use the DAG path as of now. Can be taken in a follow up if needed. Example:
![image](https://github.com/user-attachments/assets/01baa82a-7b43-4ff1-bc7e-c2fc20cef50d)

1. No need to create any more DAG files for integration tests for task runner, which could be frequent with current development rate for AIP 72.
2. Ability to easily create in line DAGs.

Basic DAG
![image](https://github.com/user-attachments/assets/cf7a94b5-6c4c-4103-99a0-32047207a9b2)

deferred DAG
![image](https://github.com/user-attachments/assets/328f99d0-4483-48c5-9127-dd7812f47ae0)

Co-Authored-By: Kaxil Naik <kaxilnaik@gmail.com>
kaxil added a commit to astronomer/airflow that referenced this pull request Dec 10, 2024
closes: apache#44805
Dependent on apache#44786

Every time when we port the different TI state handling in the task runner, it is usually followed by an integration test of sorts to test the end to end flow of whether that state is testable or not. For example:
1. For skipped state, we use the DAG https://github.com/apache/airflow/pull/44786/files#diff-cabbddd33130ce1a769412f5fc55dd23e4af4d0fa75f8981689daae769e0680dR1 and we test using the UT in task runner: https://github.com/apache/airflow/pull/44786/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R141-R161
2. For deferred state, we use the DAG: https://github.com/apache/airflow/pull/44241/files#diff-2152ed5392424771e27a69173b3c18caae717939719df8f5dbbbdfee5f9efd9bR1 and test it using UT in task runner: https://github.com/apache/airflow/pull/44241/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R93-R127

Due to this, when new ti states are added or tests for that matter, it eventually leads to a huge folder with DAGs under `task_sdk/tests/dags` which could soon get ever growing and unmanageable.

The solution is in two parts:
1. The first part would be the ability to create dynamic or in line dags which has been implemented using a DAGFactory kind of function:
```
def get_inline_dag(dag_id: str, tasks: BaseOperator) -> DAG:
    dag = DAG(
        dag_id=dag_id,
        default_args={"start_date": timezone.datetime(2024, 12, 3)},
    )
    setattr(tasks, "dag", dag)

    return dag

```
This function is capable of accepting `one` task as of now and creating a DAG out of it and returning the DAG object which should suffice our current testing needs, if there is a need, we can extend this function to support more than one tasks and their relationships.
Usage:
```
    task = PythonOperator(
        task_id="skip",
        python_callable=lambda: (_ for _ in ()).throw(
            AirflowSkipException("This task is being skipped intentionally."),
        ),
    )

    dag = get_inline_dag("basic_skipped", task)
```
The usage is as simple as creating any task from any operator and passing it down to this function.

2. Mocking the parse function using KGB spy_agency: https://pypi.org/project/kgb/
The idea here is to use a spy agency to substitute out the `parse` function with a mock parser that does a bare minimum of the actual parser. We choose spy_agency over the mock library for two reasons primarily:
a) With `spy_agency`, you can mock specific methods or functions without affecting the entire class or module.
b) Minimal dispruption and ease of use.

1. Replaced usage of all "actual" dags with in line dags in task runner tests which either do parsing or run.
2. Deleted two DAGs
3. Cannot remove the other two DAGs as they are tied to test_supervisor.py tests which use the DAG path as of now. Can be taken in a follow up if needed. Example:
![image](https://github.com/user-attachments/assets/01baa82a-7b43-4ff1-bc7e-c2fc20cef50d)

1. No need to create any more DAG files for integration tests for task runner, which could be frequent with current development rate for AIP 72.
2. Ability to easily create in line DAGs.

Basic DAG
![image](https://github.com/user-attachments/assets/cf7a94b5-6c4c-4103-99a0-32047207a9b2)

deferred DAG
![image](https://github.com/user-attachments/assets/328f99d0-4483-48c5-9127-dd7812f47ae0)

Co-Authored-By: Kaxil Naik <kaxilnaik@gmail.com>
kaxil pushed a commit that referenced this pull request Dec 10, 2024
closes: #44805
Dependent on #44786

Every time when we port the different TI state handling in the task runner, it is usually followed by an integration test of sorts to test the end to end flow of whether that state is testable or not. For example:
1. For skipped state, we use the DAG https://github.com/apache/airflow/pull/44786/files#diff-cabbddd33130ce1a769412f5fc55dd23e4af4d0fa75f8981689daae769e0680dR1 and we test using the UT in task runner: https://github.com/apache/airflow/pull/44786/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R141-R161
2. For deferred state, we use the DAG: https://github.com/apache/airflow/pull/44241/files#diff-2152ed5392424771e27a69173b3c18caae717939719df8f5dbbbdfee5f9efd9bR1 and test it using UT in task runner: https://github.com/apache/airflow/pull/44241/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R93-R127

Due to this, when new ti states are added or tests for that matter, it eventually leads to a huge folder with DAGs under `task_sdk/tests/dags` which could soon get ever growing and unmanageable.

The solution is in two parts:
1. The first part would be the ability to create dynamic or in line dags which has been implemented using a DAGFactory kind of function:
```
def get_inline_dag(dag_id: str, tasks: BaseOperator) -> DAG:
    dag = DAG(
        dag_id=dag_id,
        default_args={"start_date": timezone.datetime(2024, 12, 3)},
    )
    setattr(tasks, "dag", dag)

    return dag

```
This function is capable of accepting `one` task as of now and creating a DAG out of it and returning the DAG object which should suffice our current testing needs, if there is a need, we can extend this function to support more than one tasks and their relationships.
Usage:
```
    task = PythonOperator(
        task_id="skip",
        python_callable=lambda: (_ for _ in ()).throw(
            AirflowSkipException("This task is being skipped intentionally."),
        ),
    )

    dag = get_inline_dag("basic_skipped", task)
```
The usage is as simple as creating any task from any operator and passing it down to this function.

2. Mocking the parse function using KGB spy_agency: https://pypi.org/project/kgb/
The idea here is to use a spy agency to substitute out the `parse` function with a mock parser that does a bare minimum of the actual parser. We choose spy_agency over the mock library for two reasons primarily:
a) With `spy_agency`, you can mock specific methods or functions without affecting the entire class or module.
b) Minimal dispruption and ease of use.

1. Replaced usage of all "actual" dags with in line dags in task runner tests which either do parsing or run.
2. Deleted two DAGs
3. Cannot remove the other two DAGs as they are tied to test_supervisor.py tests which use the DAG path as of now. Can be taken in a follow up if needed. Example:
![image](https://github.com/user-attachments/assets/01baa82a-7b43-4ff1-bc7e-c2fc20cef50d)

1. No need to create any more DAG files for integration tests for task runner, which could be frequent with current development rate for AIP 72.
2. Ability to easily create in line DAGs.

Basic DAG
![image](https://github.com/user-attachments/assets/cf7a94b5-6c4c-4103-99a0-32047207a9b2)

deferred DAG
![image](https://github.com/user-attachments/assets/328f99d0-4483-48c5-9127-dd7812f47ae0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

2 participants