diff --git a/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py b/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py index b17c8147dae24..34ac077f8e220 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py +++ b/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py @@ -21,6 +21,7 @@ import pathlib from datetime import datetime, timedelta from typing import TYPE_CHECKING +from unittest.mock import patch import httpx import jwt @@ -80,28 +81,36 @@ async def mock_transport(request): return httpx.Response(status_code=200, content=jwk_content) client = httpx.AsyncClient(transport=httpx.MockTransport(mock_transport)) - jwks = JWKS(url="https://example.com/jwks.json", client=client) - spy = spy_agency.spy_on(JWKS._fetch_remote_jwks) - - key = await jwks.get_key("kid") - assert isinstance(key, jwt.PyJWK) - - # Move forward in time, but not to a point where it updates. Should not end up re-requesting. - spy.reset_calls() - time_machine.shift(1800) - assert await jwks.get_key("kid") is key - spy_agency.assert_spy_not_called(spy) - - # Not to a point where it should refresh - time_machine.shift(1801) - - key2 = key_to_jwk_dict(generate_private_key("Ed25519"), "kid2") - jwk_content = json.dumps({"keys": [key2]}) - with pytest.raises(KeyError): - # Not in the document anymore, should have gone from the keyset - await jwks.get_key("kid") - assert isinstance(await jwks.get_key("kid2"), jwt.PyJWK) - spy_agency.assert_spy_called(spy) + current = 1_000_000.0 + + def mock_monotonic(): + return current + + with patch("airflow.api_fastapi.auth.tokens.time.monotonic", side_effect=mock_monotonic): + jwks = JWKS(url="https://example.com/jwks.json", client=client) + spy = spy_agency.spy_on(JWKS._fetch_remote_jwks) + + key = await jwks.get_key("kid") + assert isinstance(key, jwt.PyJWK) + + # Move forward in time, but not to a point where it updates. Should not end up re-requesting. + spy.reset_calls() + # time_machine.shift(1800) + current += 1800 + assert await jwks.get_key("kid") is key + spy_agency.assert_spy_not_called(spy) + + # Not to a point where it should refresh + # time_machine.shift(1801) + current += 1801 + + key2 = key_to_jwk_dict(generate_private_key("Ed25519"), "kid2") + jwk_content = json.dumps({"keys": [key2]}) + with pytest.raises(KeyError): + # Not in the document anymore, should have gone from the keyset + await jwks.get_key("kid") + assert isinstance(await jwks.get_key("kid2"), jwt.PyJWK) + spy_agency.assert_spy_called(spy) def test_load_pk_from_file(tmp_path: pathlib.Path, rsa_private_key): diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 3959c59adf896..181a3db9defcc 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -526,7 +526,7 @@ def run_with_error(ti): session.get(TaskInstance, ti.id).try_number += 1 # second run -- still up for retry because retry_delay hasn't expired - time_machine.coordinates.shift(3) + time_machine.shift(3) run_with_error(ti) assert ti.state == State.UP_FOR_RETRY assert ti.try_number == 2 @@ -535,7 +535,7 @@ def run_with_error(ti): session.get(TaskInstance, ti.id).try_number += 1 # third run -- failed - time_machine.coordinates.shift(datetime.datetime.resolution) + time_machine.shift(datetime.datetime.resolution) run_with_error(ti) assert ti.state == State.FAILED assert ti.try_number == 3 diff --git a/devel-common/pyproject.toml b/devel-common/pyproject.toml index bea57e00dea70..018caba0a360b 100644 --- a/devel-common/pyproject.toml +++ b/devel-common/pyproject.toml @@ -38,8 +38,7 @@ dependencies = [ "ruff==0.14.13", "semver>=3.0.2", "typer-slim>=0.15.1", - # limited due to changes needed https://github.com/apache/airflow/issues/58470 - "time-machine>=2.19.0, <3.0.0", + "time-machine[dateutil]>=3.0.0", "wheel>=0.42.0", "yamllint>=1.33.0", "python-on-whales>=0.70.0", diff --git a/providers/amazon/tests/unit/amazon/aws/sensors/test_s3.py b/providers/amazon/tests/unit/amazon/aws/sensors/test_s3.py index d5a697cbdb853..af483be634202 100644 --- a/providers/amazon/tests/unit/amazon/aws/sensors/test_s3.py +++ b/providers/amazon/tests/unit/amazon/aws/sensors/test_s3.py @@ -542,16 +542,16 @@ def test_key_changes(self, current_objects, expected_returns, inactivity_periods for current, expected, period in zip(current_objects, expected_returns, inactivity_periods): assert self.sensor.is_keys_unchanged(current) == expected assert self.sensor.inactivity_seconds == period - time_machine.coordinates.shift(10) + time_machine.shift(10) def test_poke_succeeds_on_upload_complete(self, time_machine): time_machine.move_to(DEFAULT_DATE) self.sensor.hook = mock.MagicMock() self.sensor.hook.list_keys.return_value = {"a"} assert not self.sensor.poke(dict()) - time_machine.coordinates.shift(10) + time_machine.shift(10) assert not self.sensor.poke(dict()) - time_machine.coordinates.shift(10) + time_machine.shift(10) assert self.sensor.poke(dict()) def test_fail_is_keys_unchanged(self): diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py index 93e73f707cec3..4aa7a72f9629f 100644 --- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py +++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py @@ -349,9 +349,22 @@ def fake_sleep(seconds): overall_delta = timedelta(seconds=seconds) + timedelta(microseconds=42) time_machine.shift(overall_delta) + current = 1_000_000.0 + + def mock_monotonic(): + nonlocal current + # Shift frozen time every time we call a ``time.monotonic`` during this test case. + # Time is shifted as per passing time with time.sleep is mocked at the same level by 60 sec and 42 microseconds. + # which is emulating time which we spent in a loop + # Mocking of time.monotonic() and time.monotonic_ns() deprecated from time-machine 3.0.0 + overall_delta = timedelta(seconds=60, microseconds=42) + current += overall_delta.total_seconds() + return current + with ( patch.object(DbtCloudHook, "get_job_run") as mock_get_job_run, patch("airflow.providers.dbt.cloud.hooks.dbt.time.sleep", side_effect=fake_sleep), + patch("airflow.providers.dbt.cloud.hooks.dbt.time.monotonic", side_effect=mock_monotonic), ): mock_get_job_run.return_value.json.return_value = { "data": {"status": job_run_status, "id": RUN_ID} diff --git a/task-sdk/tests/task_sdk/bases/test_sensor.py b/task-sdk/tests/task_sdk/bases/test_sensor.py index 14d4b7a4cf720..bc7a340a702f5 100644 --- a/task-sdk/tests/task_sdk/bases/test_sensor.py +++ b/task-sdk/tests/task_sdk/bases/test_sensor.py @@ -175,7 +175,7 @@ def test_ok_with_reschedule(self, run_task, make_sensor, time_machine): assert msg.reschedule_date == date1 + timedelta(seconds=sensor.poke_interval) # second poke returns False and task is re-scheduled - time_machine.coordinates.shift(sensor.poke_interval) + time_machine.shift(sensor.poke_interval) date2 = date1 + timedelta(seconds=sensor.poke_interval) state, msg, _ = run_task(task=sensor) @@ -183,7 +183,7 @@ def test_ok_with_reschedule(self, run_task, make_sensor, time_machine): assert msg.reschedule_date == date2 + timedelta(seconds=sensor.poke_interval) # third poke returns True and task succeeds - time_machine.coordinates.shift(sensor.poke_interval) + time_machine.shift(sensor.poke_interval) state, _, _ = run_task(task=sensor) assert state == TaskInstanceState.SUCCESS @@ -201,7 +201,7 @@ def test_fail_with_reschedule(self, run_task, make_sensor, time_machine, mock_su assert msg.reschedule_date == date1 + timedelta(seconds=sensor.poke_interval) # second poke returns False, timeout occurs - time_machine.coordinates.shift(sensor.poke_interval) + time_machine.shift(sensor.poke_interval) # Mocking values from DB/API-server mock_supervisor_comms.send.return_value = TaskRescheduleStartDate(start_date=date1) @@ -223,7 +223,7 @@ def test_soft_fail_with_reschedule(self, run_task, make_sensor, time_machine, mo assert state == TaskInstanceState.UP_FOR_RESCHEDULE # second poke returns False, timeout occurs - time_machine.coordinates.shift(sensor.poke_interval) + time_machine.shift(sensor.poke_interval) # Mocking values from DB/API-server mock_supervisor_comms.send.return_value = TaskRescheduleStartDate(start_date=date1) @@ -258,7 +258,7 @@ def run_duration(): # loop poke returns false for _poke_count in range(1, false_count + 1): curr_date = curr_date + timedelta(seconds=new_interval) - time_machine.coordinates.shift(new_interval) + time_machine.shift(new_interval) state, msg, _ = run_task(sensor, context_update={"task_reschedule_count": _poke_count}) assert state == TaskInstanceState.UP_FOR_RESCHEDULE old_interval = new_interval @@ -268,7 +268,7 @@ def run_duration(): # last poke returns True and task succeeds curr_date = curr_date + timedelta(seconds=new_interval) - time_machine.coordinates.shift(new_interval) + time_machine.shift(new_interval) state, msg, _ = run_task(sensor, context_update={"task_reschedule_count": false_count + 1}) assert state == TaskInstanceState.SUCCESS diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index b45f57cd4256e..cbc9c21d72846 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -664,9 +664,16 @@ def test_supervise_handles_deferred_task( mock_client.task_instances.start.return_value = make_ti_context() time_machine.move_to(instant, tick=False) + current = 1_000_000.0 + + def mock_monotonic(): + return current bundle_info = BundleInfo(name="my-bundle", version=None) - with patch.dict(os.environ, local_dag_bundle_cfg(test_dags_dir, bundle_info.name)): + with ( + patch.dict(os.environ, local_dag_bundle_cfg(test_dags_dir, bundle_info.name)), + patch("airflow.sdk.execution_time.supervisor.time.monotonic", side_effect=mock_monotonic), + ): exit_code = supervise( ti=ti, dag_rel_path="super_basic_deferred_run.py", @@ -887,33 +894,42 @@ def test_heartbeat_failures_handling(self, monkeypatch, mocker, captured_logs, t client=client, process=mock_process, ) + current = min_heartbeat_interval - time_now = timezone.datetime(2024, 11, 28, 12, 0, 0) - time_machine.move_to(time_now, tick=False) + def mock_monotonic(): + return current - # Simulate sending heartbeats and ensure the process gets killed after max retries - for i in range(1, max_failed_heartbeats): - proc._send_heartbeat_if_needed() - assert proc.failed_heartbeats == i # Increment happens after failure - mock_client_heartbeat.assert_called_with(TI_ID, pid=mock_process.pid) - - # Ensure the retry log is present - expected_log = { - "event": "Failed to send heartbeat. Will be retried", - "failed_heartbeats": i, - "ti_id": TI_ID, - "max_retries": max_failed_heartbeats, - "level": "warning", - "logger": "supervisor", - "timestamp": mocker.ANY, - "exc_info": mocker.ANY, - "loc": mocker.ANY, - } + with patch( + "airflow.sdk.execution_time.supervisor.time.monotonic", + side_effect=mock_monotonic, + ): + time_now = timezone.datetime(2024, 11, 28, 12, 0, 0) + time_machine.move_to(time_now, tick=False) + + # Simulate sending heartbeats and ensure the process gets killed after max retries + for i in range(1, max_failed_heartbeats): + proc._send_heartbeat_if_needed() + assert proc.failed_heartbeats == i # Increment happens after failure + mock_client_heartbeat.assert_called_with(TI_ID, pid=mock_process.pid) + + # Ensure the retry log is present + expected_log = { + "event": "Failed to send heartbeat. Will be retried", + "failed_heartbeats": i, + "ti_id": TI_ID, + "max_retries": max_failed_heartbeats, + "level": "warning", + "logger": "supervisor", + "timestamp": mocker.ANY, + "exc_info": mocker.ANY, + "loc": mocker.ANY, + } - assert expected_log in captured_logs + assert expected_log in captured_logs - # Advance time by `min_heartbeat_interval` to allow the next heartbeat - time_machine.shift(min_heartbeat_interval) + # Advance time by `min_heartbeat_interval` to allow the next heartbeat + # time_machine.shift(min_heartbeat_interval) + current += min_heartbeat_interval # On the final failure, the process should be killed proc._send_heartbeat_if_needed() @@ -1070,7 +1086,7 @@ def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, time_machine): mock_process = mocker.Mock(pid=12345) - time_machine.move_to(time.monotonic(), tick=False) + time_machine.move_to(time.time(), tick=False) proc = ActivitySubprocess( process_log=mocker.MagicMock(), @@ -1087,7 +1103,7 @@ def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, time_machine): proc._exit_code = 0 # Create a fake placeholder in the open socket weakref proc._open_sockets[mocker.MagicMock()] = "test placeholder" - proc._process_exit_monotonic = time.monotonic() + proc._process_exit_monotonic = time.time() mocker.patch.object( ActivitySubprocess, @@ -1299,7 +1315,7 @@ def test_max_wait_time_prevents_cpu_spike(self, watched_subprocess, mock_process # Set up a scenario where the last successful heartbeat was a long time ago # This will cause the heartbeat calculation to result in a negative value - mock_process._last_successful_heartbeat = time.monotonic() - 100 # 100 seconds ago + mock_process._last_successful_heartbeat = time.time() - 100 # 100 seconds ago # Mock process to still be alive (not exited) mock_process.wait.side_effect = psutil.TimeoutExpired(pid=12345, seconds=0) @@ -1342,7 +1358,7 @@ def test_max_wait_time_calculation_edge_cases( monkeypatch.setattr("airflow.sdk.execution_time.supervisor.HEARTBEAT_TIMEOUT", heartbeat_timeout) monkeypatch.setattr("airflow.sdk.execution_time.supervisor.MIN_HEARTBEAT_INTERVAL", min_interval) - watched_subprocess._last_successful_heartbeat = time.monotonic() - heartbeat_ago + watched_subprocess._last_successful_heartbeat = time.time() - heartbeat_ago mock_process.wait.side_effect = psutil.TimeoutExpired(pid=12345, seconds=0) # Call the method and verify timeout is never less than our minimum