Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ dependencies = [
"eventlet>=0.37.0",
"gevent>=25.4.1",
"greenlet>=3.1.0",
"greenback>=1.2.1",
]
"graphviz" = [
# The graphviz package creates friction when installing on MacOS as it needs graphviz system package to
Expand Down
5 changes: 5 additions & 0 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,11 @@ async def block_watchdog(self):

async def run_trigger(self, trigger_id, trigger):
"""Run a trigger (they are async generators) and push their events into our outbound event deque."""
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true":
import greenback

await greenback.ensure_portal()

bind_log_contextvars(trigger_id=trigger_id)

name = self.triggers[trigger_id]["name"]
Expand Down
11 changes: 5 additions & 6 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ def test_trigger_log(mock_monotonic, trigger, watcher_count, trigger_count, sess


class TestTriggerRunner:
@pytest.mark.asyncio
async def test_run_inline_trigger_canceled(self, session) -> None:
def test_run_inline_trigger_canceled(self, session) -> None:
trigger_runner = TriggerRunner()
trigger_runner.triggers = {
1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name", "events": 0}
Expand All @@ -313,10 +312,10 @@ async def test_run_inline_trigger_canceled(self, session) -> None:
mock_trigger.run.side_effect = asyncio.CancelledError()

with pytest.raises(asyncio.CancelledError):
await trigger_runner.run_trigger(1, mock_trigger)
asyncio.run(trigger_runner.run_trigger(1, mock_trigger))

@pytest.mark.asyncio
async def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None:
# @pytest.mark.asyncio
def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None:
trigger_runner = TriggerRunner()
trigger_runner.triggers = {
1: {"task": MagicMock(spec=asyncio.Task), "name": "mock_name", "events": 0}
Expand All @@ -326,7 +325,7 @@ async def test_run_inline_trigger_timeout(self, session, cap_structlog) -> None:
mock_trigger.run.side_effect = asyncio.CancelledError()

with pytest.raises(asyncio.CancelledError):
await trigger_runner.run_trigger(1, mock_trigger)
asyncio.run(trigger_runner.run_trigger(1, mock_trigger))
assert {"event": "Trigger cancelled due to timeout", "log_level": "error"} in cap_structlog

@patch("airflow.jobs.triggerer_job_runner.Trigger._decrypt_kwargs")
Expand Down
19 changes: 19 additions & 0 deletions task-sdk/src/airflow/sdk/definitions/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import asyncio
import json
import logging
from json import JSONDecodeError
Expand Down Expand Up @@ -203,6 +204,24 @@ def get(cls, conn_id: str) -> Any:
return _get_connection(conn_id)
except AirflowRuntimeError as e:
cls._handle_connection_error(e, conn_id)
except RuntimeError as e:
# The error from async_to_sync is a RuntimeError, so we have to fall back to text matching
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
import greenback

task = asyncio.current_task()
if greenback.has_portal(task):
import warnings

warnings.warn(
"You should not use sync calls here -- use `await Conn.async_get` instead",
stacklevel=2,
)

return greenback.await_(cls.async_get(conn_id))

log.exception("async_to_sync failed")
raise

@classmethod
async def async_get(cls, conn_id: str) -> Any:
Expand Down