diff --git a/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py b/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py index a42be8d4ff49e..e2ad488b448ff 100644 --- a/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py +++ b/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py @@ -29,6 +29,7 @@ from typing import TYPE_CHECKING, Any, Literal from deprecated import deprecated +from sqlalchemy.engine import URL from typing_extensions import overload from airflow.configuration import conf @@ -1131,3 +1132,25 @@ def get_pandas_df( # type: ignore **kwargs, ) -> pd.DataFrame: return self._get_pandas_df(sql, schema=schema, hive_conf=hive_conf, **kwargs) + + @property + def sqlalchemy_url(self) -> URL: + """Return a `sqlalchemy.engine.URL` object constructed from the connection.""" + conn = self.get_connection(self.get_conn_id()) + extra = conn.extra_dejson or {} + + query = {k: str(v) for k, v in extra.items() if v is not None and k != "__extra__"} + + return URL.create( + drivername="hive", + username=conn.login, + password=conn.password, + host=conn.host, + port=conn.port, + database=conn.schema, + query=query, + ) + + def get_uri(self) -> str: + """Return a SQLAlchemy engine URL as a string.""" + return self.sqlalchemy_url.render_as_string(hide_password=False) diff --git a/providers/apache/hive/tests/unit/apache/hive/hooks/test_hive.py b/providers/apache/hive/tests/unit/apache/hive/hooks/test_hive.py index 0d95384fab865..a6ef4caa45648 100644 --- a/providers/apache/hive/tests/unit/apache/hive/hooks/test_hive.py +++ b/providers/apache/hive/tests/unit/apache/hive/hooks/test_hive.py @@ -885,6 +885,27 @@ def test_get_results_with_hive_conf(self): assert f"test_{date_key}" in output assert "test_dag_run_id" in output + def test_sqlalchemy_uri(self): + """Test sqlalchemy_url with connection parameters""" + + with mock.patch.object(HiveServer2Hook, "get_connection") as mock_get_conn: + mock_get_conn.return_value = Connection( + conn_id="test_hive_conn", + conn_type="hive_cli", + host="localhost", + port=10000, + schema="default", + login="admin", + password="admin", + ) + hook = HiveServer2Hook() + uri = hook.sqlalchemy_url + assert uri.host == "localhost" + assert uri.port == 10000 + assert uri.database == "default" + assert uri.username == "admin" + assert uri.password == "admin" + @pytest.mark.db_test @mock.patch.dict("os.environ", AIRFLOW__CORE__SECURITY="kerberos")