Skip to content

Connection not available in triggerer #57145

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

3.1.1rc1

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

I’m getting a “connection not found” error with the async DAG in 3.1.1rc1 when using a UI-created connection, though it works fine with deferrable=False or when the connection is set via environment variables

Error logs

[2025-10-23 14:49:13] ERROR - Trigger example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1 (ID 1) exited with error The conn_id `databricks_default` isn't defined loc=triggerer_job_runner.py:1001
AirflowNotFoundException: The conn_id `databricks_default` isn't defined

File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 992 in cleanup_finished_triggers

File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 116 in greenback_shim

File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 201 in _greenback_shim

File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 81 in trampoline

File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line 185 in send

File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1106 in run_trigger

File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py", line 90 in run

File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py", line 524 in a_get_run_state

File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 713 in _a_do_api_call

File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 623 in _endpoint_url

File "/usr/python/lib/python3.10/functools.py", line 981 in __get__

File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 142 in databricks_conn

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61 in get_connection

File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 226 in get

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 172 in _get_connection
[2025-10-23 14:49:13] ERROR - Trigger exited without sending an event. Dependent tasks will be failed. name=example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1 (ID 1) loc=triggerer_job_runner.py:1016
[2025-10-23 14:49:14] INFO - DAG bundles loaded: dags-folder source=airflow.dag_processing.bundles.manager.DagBundlesManager loc=manager.py:179
[2025-10-23 14:49:14] INFO - Filling up the DagBag from /files/dags/example_databricks.py source=airflow.models.dagbag.DagBag loc=dagbag.py:593
[2025-10-23 14:49:14] WARNING - The `airflow.utils.timezone.datetime` attribute is deprecated. Please use `'airflow.sdk.timezone.datetime'`. category=DeprecatedImportWarning source=py.warnings loc=/files/dags/example_databricks.py:7
[2025-10-23 14:49:14] ERROR - Trigger failed:
Traceback (most recent call last):

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 992, in cleanup_finished_triggers
    result = details["task"].result()

  File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 116, in greenback_shim
    return await _greenback_shim(orig_coro, next_send)  # type: ignore

  File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 201, in _greenback_shim
    next_yield, resume_greenlet = resume_greenlet.switch(next_send)

  File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 81, in trampoline
    next_yield: Any = next_send.send(orig_coro)  # type: ignore

  File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line 185, in send
    return gen.send(self.value)

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1106, in run_trigger
    async for event in trigger.run():

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py", line 90, in run
    run_state = await self.hook.a_get_run_state(self.run_id)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py", line 524, in a_get_run_state
    response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 713, in _a_do_api_call
    url = self._endpoint_url(full_endpoint)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 623, in _endpoint_url
    port = f":{self.databricks_conn.port}" if self.databricks_conn.port else ""

  File "/usr/python/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)

  File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 142, in databricks_conn
    return self.get_connection(self.databricks_conn_id)  # type: ignore[return-value]

  File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61, in get_connection
    conn = Connection.get(conn_id)

  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 226, in get
    return _get_connection(conn_id)

  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 172, in _get_connection
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")

airflow.exceptions.AirflowNotFoundException: The conn_id `databricks_default` isn't defined

What you think should happen instead?

No response

How to reproduce

  1. Create databricks_default connection from UI.
  2. Execute below DAG
import json
import os
from datetime import timedelta
from typing import Dict, Optional

from airflow.models.dag import DAG
from airflow.utils.timezone import datetime

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator, DatabricksRunNowOperator

DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", "databricks_default")
# Notebook path as a Json object
notebook_task = '{"notebook_path": "/Shared/Notebook_1"}'
NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", notebook_task))
notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))

default_args = {
    "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
    "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
    "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}

new_cluster = {
    "num_workers": 1,
    "spark_version": "10.4.x-scala2.12",
    "spark_conf": {},
    "azure_attributes": {
        "availability": "ON_DEMAND_AZURE",
        "spot_bid_max_price": -1,
    },
    "node_type_id": "Standard_D3_v2",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
    "cluster_source": "JOB",
    "init_scripts": [],
}


with DAG(
    dag_id="example_async_databricks",
    start_date=datetime(2022, 1, 1),
    schedule=None,
    catchup=False,
    default_args=default_args,
    tags=["example", "async", "databricks"],
) as dag:
    # [START howto_operator_databricks_submit_run_async]
    opr_submit_run = DatabricksSubmitRunOperator(
        task_id="submit_run",
        databricks_conn_id=DATABRICKS_CONN_ID,
        new_cluster=new_cluster,
        notebook_task=NOTEBOOK_TASK,
        do_xcom_push=True,
        deferrable=True,
    )
    # [END howto_operator_databricks_submit_run_async]

    # [START howto_operator_databricks_run_now_async]
    opr_run_now = DatabricksRunNowOperator(
        task_id="run_now",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",
        notebook_params=notebook_params,
        deferrable=True
    )
    # [END howto_operator_databricks_run_now_async]

opr_submit_run >> opr_run_now

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions