Skip to content

Commit

Permalink
Add unit test to cover back compat case in celery
Browse files Browse the repository at this point in the history
A unit test which triggers the scenario of a current Celery executor
with the new signature of change_state run against an older version of
Airflow with the old signature of change_state on the BaseExecutor
class.

related apache#40011
related apache#39980
related apache#40012
  • Loading branch information
o-nikolas committed Jun 4, 2024
1 parent 21dae6e commit 9075846
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion tests/integration/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import sys
from datetime import datetime
from importlib import reload
from time import sleep
from unittest import mock

Expand All @@ -37,10 +38,12 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.executors import base_executor
from airflow.models.dag import DAG
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.utils.state import State
from airflow.utils.state import State, TaskInstanceState
from tests.test_utils import db

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -107,6 +110,27 @@ def teardown_method(self) -> None:
db.clear_db_runs()
db.clear_db_jobs()

def test_change_state_back_compat(self):
# This represents the old implementation that an Airflow package may have
def _change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None:
pass

# Replace change_state function on base executor with the old version to force the backcompat edge
# case we're looking for
base_executor.BaseExecutor.change_state = _change_state
# Create an instance of celery executor while the base executor is modified
from airflow.providers.celery.executors import celery_executor

executor = celery_executor.CeleryExecutor()

# This will throw an exception if the backcompat is not properly handled
executor.change_state(
key=TaskInstanceKey("foo", "bar", "baz"), state=TaskInstanceState.QUEUED, info="test"
)
# Restore the base executor and celery modules
reload(base_executor)
reload(celery_executor)

@pytest.mark.flaky(reruns=3)
@pytest.mark.parametrize("broker_url", _prepare_test_bodies())
def test_celery_integration(self, broker_url):
Expand Down

0 comments on commit 9075846

Please sign in to comment.