Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Mar 27, 2025

As we are replacing BaseOperator usage from Core to Task SDK, we are running into several issues, one of the common one being over-usage of task.run().

While some cases can be easily replaced by task.execute() others needs execution of the tasks, sharing of XCom's in between, checking task state, correct exception etc.

To make this easier I have added run_task fixture which I have been using in #48244 and it has worked out well. This also takes care of creating a DAG if not provided.

Example Usage

Basic Task Execution

def test_basic_task(run_task):
    class MyTaskOperator(BaseOperator):
        def execute(self, context):
            return "hello"

    task = MyTaskOperator(task_id="test_task")
    run_task(task)
    assert run_task.state == TerminalTIState.SUCCESS
    assert run_task.error is None

Testing XCom Operations

def test_xcom_operations(run_task):
    class UpstreamTask(BaseOperator):
        def execute(self, context):
            context["task_instance"].xcom_push(key="my_value", value=100)
            return "done"

    class DownstreamTask(BaseOperator):
        def execute(self, context):
            value = context["task_instance"].xcom_pull(key="my_value")
            assert value == 100
            return "done"

    # Run upstream task
    upstream = UpstreamTask(task_id="upstream")
    run_task(upstream)
    
    # Verify XCom was pushed
    run_task.xcom.assert_pushed("my_value", 100)
    
    # Run downstream task
    downstream = DownstreamTask(task_id="downstream")
    run_task(downstream)
    
    # Verify XCom can be retrieved
    assert run_task.xcom.get("my_value") == 100

Testing Task Failures

def test_task_failure(run_task):
    class FailingTask(BaseOperator):
        def execute(self, context):
            raise ValueError("Task failed")

    task = FailingTask(task_id="failing_task")
    run_task(task)
    assert run_task.state == TerminalTIState.FAILED
    assert isinstance(run_task.error, ValueError)

Migration Guide

Replace existing ti.run() usage with the new fixture:

Before:

def test_old_style():
    dag = DAG("test_dag")
    task = MyTaskOperator(task_id="test_task", dag=dag)
    ti = TaskInstance(task=task, execution_date=datetime.now())
    ti.run()
    assert ti.state == State.SUCCESS

After:

def test_new_style(run_task):
    task = MyTaskOperator(task_id="test_task")
    run_task(task)
    assert run_task.state == TerminalTIState.SUCCESS

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

As we are replacing BaseOperator usage from Core to Task SDK, we are running into several issues, one of the common one being over-usage of `task.run()`.

While some cases can be easily replaced by `task.execute()` others needs execution of the tasks, sharing of XCom's in between, checking task state, correct exception etc.

To make this easier I have added `run_task` fixture which I have been using in apache#48244 and it has worked out well.

Example:
@kaxil kaxil requested review from amoghrajesh, ashb and uranusjr March 27, 2025 12:34
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost wonder if it's worth defining some of the classes and protocols in another file inside tests_common package, so that the fixture can be smaller.

For example something like this

from ... import RunTaskCallable

@pytest.fixture
def run_task(create_runtime_ti, mock_supervisor_comms, spy_agency) -> RunTaskCallable:
    """
    Fixture to run a task without defining a dag file.
    This fixture builds on top of create_runtime_ti to provide a convenient way to execute tasks and get their results.
    The fixture provides:
    - run_task.state - Get the task state
    - run_task.msg - Get the task message
    - run_task.error - Get the task error
    - run_task.xcom.get(key) - Get an XCom value
    - run_task.xcom.assert_pushed(key, value, ...) - Assert an XCom was pushed
    Example usage: ::
        def test_custom_task(run_task):
            class MyTaskOperator(BaseOperator):
                def execute(self, context):
                    return "hello"
            task = MyTaskOperator(task_id="test_task")
            run_task(task)
            assert run_task.state == TerminalTIState.SUCCESS
            assert run_task.error is None
    """
    return RunTaskWithXCom(create_runtime_ti, mock_supervisor_comms, spy_agency)

@kaxil kaxil merged commit 80840cb into apache:main Mar 27, 2025
7 checks passed
@kaxil kaxil deleted the conv-test-fixtures branch March 27, 2025 13:23
@kaxil
Copy link
Member Author

kaxil commented Mar 27, 2025

I almost wonder if it's worth defining some of the classes and protocols in another file inside tests_common package, so that the fixture can be smaller.

Good idea, might take a stab at it it later on with other fixtures too.

pankajkoti pushed a commit to astronomer/airflow that referenced this pull request Mar 28, 2025
As we are replacing BaseOperator usage from Core to Task SDK, we are running into several issues, one of the common one being over-usage of `task.run()`.

While some cases can be easily replaced by `task.execute()` others needs execution of the tasks, sharing of XCom's in between, checking task state, correct exception etc.

To make this easier I have added `run_task` fixture which I have been using in apache#48244 and it has worked out well.

Example:
shubham-pyc pushed a commit to shubham-pyc/airflow that referenced this pull request Apr 2, 2025
As we are replacing BaseOperator usage from Core to Task SDK, we are running into several issues, one of the common one being over-usage of `task.run()`.

While some cases can be easily replaced by `task.execute()` others needs execution of the tasks, sharing of XCom's in between, checking task state, correct exception etc.

To make this easier I have added `run_task` fixture which I have been using in apache#48244 and it has worked out well.

Example:
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
As we are replacing BaseOperator usage from Core to Task SDK, we are running into several issues, one of the common one being over-usage of `task.run()`.

While some cases can be easily replaced by `task.execute()` others needs execution of the tasks, sharing of XCom's in between, checking task state, correct exception etc.

To make this easier I have added `run_task` fixture which I have been using in apache#48244 and it has worked out well.

Example:
simonprydden pushed a commit to simonprydden/airflow that referenced this pull request Apr 8, 2025
As we are replacing BaseOperator usage from Core to Task SDK, we are running into several issues, one of the common one being over-usage of `task.run()`.

While some cases can be easily replaced by `task.execute()` others needs execution of the tasks, sharing of XCom's in between, checking task state, correct exception etc.

To make this easier I have added `run_task` fixture which I have been using in apache#48244 and it has worked out well.

Example:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants