From 2bd803a395a931d97e820014c773a26d072aaa98 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 29 Jul 2025 06:04:08 +0300 Subject: [PATCH 1/3] feat(HITL): improve hitl trigger logging message --- .../providers/standard/triggers/hitl.py | 7 ++++++- .../tests/unit/standard/triggers/test_hitl.py | 19 ++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/triggers/hitl.py b/providers/standard/src/airflow/providers/standard/triggers/hitl.py index 63cea15363717..86107bae0efd8 100644 --- a/providers/standard/src/airflow/providers/standard/triggers/hitl.py +++ b/providers/standard/src/airflow/providers/standard/triggers/hitl.py @@ -111,6 +111,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: chosen_options=self.defaults, params_input=self.params, ) + self.log.info( + "[HITL] timeout reached before receiving response, fallback to default %s", self.defaults + ) yield TriggerEvent( HITLTriggerEventSuccessPayload( chosen_options=self.defaults, @@ -121,7 +124,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: resp = await sync_to_async(get_hitl_detail_content_detail)(ti_id=self.ti_id) if resp.response_received and resp.chosen_options: - self.log.info("Responded by %s at %s", resp.user_id, resp.response_at) + self.log.info( + "[HITL] user=%s options=%s at %s", resp.user_id, resp.chosen_options, resp.response_at + ) yield TriggerEvent( HITLTriggerEventSuccessPayload( chosen_options=resp.chosen_options, diff --git a/providers/standard/tests/unit/standard/triggers/test_hitl.py b/providers/standard/tests/unit/standard/triggers/test_hitl.py index ac96d9eed1e07..239ec82534634 100644 --- a/providers/standard/tests/unit/standard/triggers/test_hitl.py +++ b/providers/standard/tests/unit/standard/triggers/test_hitl.py @@ -25,7 +25,8 @@ pytest.skip("Human in the loop public API compatible with Airflow >= 3.0.1", allow_module_level=True) import asyncio -from datetime import timedelta +import logging +from datetime import datetime, timedelta from unittest import mock from uuid6 import uuid7 @@ -99,7 +100,9 @@ async def test_run_failed_due_to_timeout(self, mock_update, mock_supervisor_comm @pytest.mark.db_test @pytest.mark.asyncio @mock.patch("airflow.sdk.execution_time.hitl.update_htil_detail_response") - async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_supervisor_comms): + async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_supervisor_comms, caplog): + caplog.set_level(logging.INFO) + trigger = HITLTrigger( ti_id=TI_ID, options=["1", "2", "3", "4", "5"], @@ -121,6 +124,7 @@ async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_su trigger_task = asyncio.create_task(gen.__anext__()) await asyncio.sleep(0.3) event = await trigger_task + assert event == TriggerEvent( HITLTriggerEventSuccessPayload( chosen_options=["1"], @@ -128,10 +132,17 @@ async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_su ) ) + assert ( + "[HITL] timeout reached before receiving response, fallback to default ['1']" in caplog.messages + ) + @pytest.mark.db_test @pytest.mark.asyncio @mock.patch("airflow.sdk.execution_time.hitl.update_htil_detail_response") - async def test_run(self, mock_update, mock_supervisor_comms): + async def test_run(self, mock_update, mock_supervisor_comms, time_machine, caplog): + time_machine.move_to(datetime(2025, 7, 29, 2, 0, 0)) + caplog.set_level(logging.INFO) + trigger = HITLTrigger( ti_id=TI_ID, options=["1", "2", "3", "4", "5"], @@ -159,3 +170,5 @@ async def test_run(self, mock_update, mock_supervisor_comms): params_input={"input": 50}, ) ) + + assert "[HITL] user=test options=['3'] at 2025-07-29 02:00:00+00:00" in caplog.messages From 3eef52725c5b18d287b5101ff0b2b092db5fd875 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 29 Jul 2025 06:45:28 +0300 Subject: [PATCH 2/3] fixup! feat(HITL): improve hitl trigger logging message --- .../tests/unit/standard/triggers/test_hitl.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/providers/standard/tests/unit/standard/triggers/test_hitl.py b/providers/standard/tests/unit/standard/triggers/test_hitl.py index 239ec82534634..cb221bd1664a7 100644 --- a/providers/standard/tests/unit/standard/triggers/test_hitl.py +++ b/providers/standard/tests/unit/standard/triggers/test_hitl.py @@ -25,12 +25,12 @@ pytest.skip("Human in the loop public API compatible with Airflow >= 3.0.1", allow_module_level=True) import asyncio -import logging from datetime import datetime, timedelta from unittest import mock from uuid6 import uuid7 +from airflow._shared.timezones.timezone import utc, utcnow from airflow.api_fastapi.execution_api.datamodels.hitl import HITLDetailResponse from airflow.providers.standard.triggers.hitl import ( HITLTrigger, @@ -38,7 +38,6 @@ HITLTriggerEventSuccessPayload, ) from airflow.triggers.base import TriggerEvent -from airflow.utils.timezone import utcnow TI_ID = uuid7() @@ -99,10 +98,9 @@ async def test_run_failed_due_to_timeout(self, mock_update, mock_supervisor_comm @pytest.mark.db_test @pytest.mark.asyncio + @mock.patch.object(HITLTrigger, "log") @mock.patch("airflow.sdk.execution_time.hitl.update_htil_detail_response") - async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_supervisor_comms, caplog): - caplog.set_level(logging.INFO) - + async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_log, mock_supervisor_comms): trigger = HITLTrigger( ti_id=TI_ID, options=["1", "2", "3", "4", "5"], @@ -132,16 +130,16 @@ async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_su ) ) - assert ( - "[HITL] timeout reached before receiving response, fallback to default ['1']" in caplog.messages + assert mock_log.info.call_args == mock.call( + "[HITL] timeout reached before receiving response, fallback to default %s", ["1"] ) @pytest.mark.db_test @pytest.mark.asyncio + @mock.patch.object(HITLTrigger, "log") @mock.patch("airflow.sdk.execution_time.hitl.update_htil_detail_response") - async def test_run(self, mock_update, mock_supervisor_comms, time_machine, caplog): + async def test_run(self, mock_update, mock_log, mock_supervisor_comms, time_machine): time_machine.move_to(datetime(2025, 7, 29, 2, 0, 0)) - caplog.set_level(logging.INFO) trigger = HITLTrigger( ti_id=TI_ID, @@ -171,4 +169,9 @@ async def test_run(self, mock_update, mock_supervisor_comms, time_machine, caplo ) ) - assert "[HITL] user=test options=['3'] at 2025-07-29 02:00:00+00:00" in caplog.messages + assert mock_log.info.call_args == mock.call( + "[HITL] user=%s options=%s at %s", + "test", + ["3"], + datetime(2025, 7, 29, 2, 0, 0, tzinfo=utc), + ) From 2c7377a6f8dd4c1cc3bee9d6add6b321c012c9ea Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 29 Jul 2025 06:49:32 +0300 Subject: [PATCH 3/3] test(hitl): extract common args --- .../tests/unit/standard/triggers/test_hitl.py | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/providers/standard/tests/unit/standard/triggers/test_hitl.py b/providers/standard/tests/unit/standard/triggers/test_hitl.py index cb221bd1664a7..93d44ecc9f14e 100644 --- a/providers/standard/tests/unit/standard/triggers/test_hitl.py +++ b/providers/standard/tests/unit/standard/triggers/test_hitl.py @@ -40,18 +40,21 @@ from airflow.triggers.base import TriggerEvent TI_ID = uuid7() +default_trigger_args = { + "ti_id": TI_ID, + "options": ["1", "2", "3", "4", "5"], + "params": {"input": 1}, + "multiple": False, +} class TestHITLTrigger: def test_serialization(self): trigger = HITLTrigger( - ti_id=TI_ID, - options=["1", "2", "3", "4", "5"], - params={"input": 1}, defaults=["1"], - multiple=False, timeout_datetime=None, poke_interval=50.0, + **default_trigger_args, ) classpath, kwargs = trigger.serialize() assert classpath == "airflow.providers.standard.triggers.hitl.HITLTrigger" @@ -70,12 +73,9 @@ def test_serialization(self): @mock.patch("airflow.sdk.execution_time.hitl.update_htil_detail_response") async def test_run_failed_due_to_timeout(self, mock_update, mock_supervisor_comms): trigger = HITLTrigger( - ti_id=TI_ID, - options=["1", "2", "3", "4", "5"], - params={"input": 1}, - multiple=False, timeout_datetime=utcnow() + timedelta(seconds=0.1), poke_interval=5, + **default_trigger_args, ) mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=False, @@ -102,13 +102,10 @@ async def test_run_failed_due_to_timeout(self, mock_update, mock_supervisor_comm @mock.patch("airflow.sdk.execution_time.hitl.update_htil_detail_response") async def test_run_fallback_to_default_due_to_timeout(self, mock_update, mock_log, mock_supervisor_comms): trigger = HITLTrigger( - ti_id=TI_ID, - options=["1", "2", "3", "4", "5"], - params={"input": 1}, defaults=["1"], - multiple=False, timeout_datetime=utcnow() + timedelta(seconds=0.1), poke_interval=5, + **default_trigger_args, ) mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=False, @@ -142,13 +139,10 @@ async def test_run(self, mock_update, mock_log, mock_supervisor_comms, time_mach time_machine.move_to(datetime(2025, 7, 29, 2, 0, 0)) trigger = HITLTrigger( - ti_id=TI_ID, - options=["1", "2", "3", "4", "5"], - params={"input": 1}, defaults=["1"], - multiple=False, timeout_datetime=None, poke_interval=5, + **default_trigger_args, ) mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=True,