Skip to content

RuntimeError in trigger creating hooks, Airflow 3.0.3 #53447

@opeida

Description

@opeida

Apache Airflow version

3.0.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The error RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. is occurred when BigQueryHook inits in async def run(self). The issue began immediately after upgrading from 3.0.2 to 3.0.3.

class BigQueryTaskStatusTrigger(BaseTrigger):
    async def run(self) -> AsyncIterator[TriggerEvent]:
        try:
            hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=False)
            while True:
                status = await self._get_status(hook)

The error can be fixed by inititializing BigQueryHook in a function with @sync_to_async decorator but it doesn't look like the best practice:

@sync_to_async
def _get_status(self) -> str | None:
    hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=False)
    self.log.info(f"Retrieving status for task {self.monitored_task_id}")
    bq_client = hook.get_client(project_id=self.project_id)
    job = bq_client.query(self.query)
    result = list(job.result())
    return result[0].status if result else None

What you think should happen instead?

The hook should be initialized without errors.

How to reproduce

dag.py:

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator

from .sensors import BigQueryTaskStatusSensor

with DAG(
    f"trigger_test_dag",
) as dag:

    start = EmptyOperator(task_id="start")

    check_task_completed = BigQueryTaskStatusSensor(
        task_id="check_task_completed",
        monitored_task_id="test-task-1",
        deferrable=True
    )

    end = EmptyOperator(task_id="end", trigger_rule="none_failed")

    start >> check_task_completed >> end


if __name__ == "__main__":
    dag.test()

sensors.py:

from collections.abc import Sequence
from datetime import timedelta
from typing import Any

from airflow.configuration import conf
from airflow.exceptions import AirflowFailException
from airflow.sdk.definitions.context import Context
from airflow.sensors.base import BaseSensorOperator

from .triggers import BigQueryTaskStatusTrigger


class BigQueryTaskStatusSensor(BaseSensorOperator):

    template_fields: Sequence[str] = ("monitored_task_id", "gcp_conn_id", "project_id", "dataset", "table")

    def __init__(
        self,
        *,
        monitored_task_id: str,
        gcp_conn_id: str = "google_cloud_default",
        project_id: str = "automation",
        dataset: str = "cloud_functions_logs",
        table: str = "airflow_transfer_data_to_bq",
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.monitored_task_id = monitored_task_id
        self.gcp_conn_id = gcp_conn_id
        self.project_id = project_id
        self.dataset = dataset
        self.table = table
        self.deferrable = deferrable

    def execute(self, context: Context) -> Any:
        if self.deferrable:
            self.defer(
                trigger=BigQueryTaskStatusTrigger(
                    monitored_task_id=self.monitored_task_id,
                    gcp_conn_id=self.gcp_conn_id,
                    project_id=self.project_id,
                    query=self._get_sql_query(),
                    target_status="SUCCESS",
                    fail_status="FAILED",
                    poke_interval=15
                    ),
                timeout=timedelta(minutes=15),
                method_name="execute_complete"
            )
        else:
            raise AirflowFailException("The sensor must be deferrable.")

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> Any:
        return

    def _get_sql_query(self) -> str:
        return f"""SELECT status
            FROM `{self.project_id}.{self.dataset}.{self.table}`
            WHERE task_name = '{self.monitored_task_id}'
            ORDER BY created_at DESC
            LIMIT 1
            ;
        """

triggers.py:

import asyncio
from collections.abc import AsyncIterator
from typing import Any

from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.triggers.base import BaseTrigger, TaskFailedEvent, TriggerEvent


class BigQueryTaskStatusTrigger(BaseTrigger):

    def __init__(
        self,
        monitored_task_id: str,
        gcp_conn_id: str,
        project_id: str,
        query: str,
        target_status: str,
        fail_status: str,
        poke_interval: float,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.monitored_task_id = monitored_task_id
        self.gcp_conn_id = gcp_conn_id
        self.project_id = project_id
        self.query = query
        self.target_status = target_status
        self.fail_status = fail_status
        self.poke_interval = poke_interval

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            "common.airflow_custom.triggers.BigQueryTaskStatusTrigger",
            {
                "monitored_task_id": self.monitored_task_id,
                "gcp_conn_id": self.gcp_conn_id,
                "project_id": self.project_id,
                "query": self.query,
                "target_status": self.target_status,
                "fail_status": self.fail_status,
                "poke_interval": self.poke_interval,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        try:
            hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=False)
            while True:
                status = await self._get_status(hook)
                if not status:
                    yield TaskFailedEvent(
                        f"An error occurred while receiving status for task: {self.monitored_task_id}"
                    )
                    return
                match status:
                    case self.target_status:
                        yield TriggerEvent({"monitored_task_id": self.monitored_task_id})
                        return
                    case self.fail_status:
                        yield TaskFailedEvent(f"Task {self.monitored_task_id} failed")
                        return
                    case _:
                        self.log.info(f"Task {self.monitored_task_id} is in status {status}")
                await asyncio.sleep(self.poke_interval)
        except Exception as e:
            yield TaskFailedEvent(f"Something went wrong.\n{str(e)}")
            return

    async def _get_status(self, hook: BigQueryHook) -> str | None:
        self.log.info(f"Retrieving status for task {self.monitored_task_id}")
        bq_client = hook.get_client(project_id=self.project_id)
        job = bq_client.query(self.query)
        result = list(job.result())
        return result[0].status if result else None

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.12.1
apache-airflow-providers-common-compat==1.7.2
apache-airflow-providers-common-io==1.6.1
apache-airflow-providers-common-sql==1.27.3
apache-airflow-providers-fab==2.3.0
apache-airflow-providers-google==16.1.0
apache-airflow-providers-http==5.3.2
apache-airflow-providers-postgres==6.2.1
apache-airflow-providers-redis==4.1.1
apache-airflow-providers-smtp==2.1.1
apache-airflow-providers-standard==1.4.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

Deployed on GKE with extended image based on apache/airflow:slim-3.0.3-python3.12 and Helm chart 1.17.0

Anything else?

Trigger failed:
Traceback (most recent call last):

  File "/opt/airflow/dags/repo/dags/common/airflow_custom/triggers.py", line 47, in run
    hook = BigQueryHook(gcp_conn_id=self.gcp_conn_id, use_legacy_sql=False)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 167, in __init__
    super().__init__(**kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/common/hooks/base_google.py", line 284, in __init__
    self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/hooks/base.py", line 64, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py", line 472, in get_connection_from_secrets
    conn = TaskSDKConnection.get(conn_id=conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py", line 142, in get
    return _get_connection(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/context.py", line 155, in _get_connection
    msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 713, in send
    return async_to_sync(self.asend)(msg)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 186, in __call__
    raise RuntimeError(

RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 936, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 1045, in run_trigger
    async for event in trigger.run():
  File "/opt/airflow/dags/repo/dags/common/airflow_custom/triggers.py", line 66, in run
    yield TaskFailedEvent(f"Something went wrong.\n{str(e)}")
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

TypeError: BaseTaskEndEvent.__init__() takes 1 positional argument but 2 were given

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions