Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from typing import TYPE_CHECKING, Any, Literal

from deprecated import deprecated
from sqlalchemy.engine import URL
from typing_extensions import overload

from airflow.configuration import conf
Expand Down Expand Up @@ -1131,3 +1132,25 @@ def get_pandas_df( # type: ignore
**kwargs,
) -> pd.DataFrame:
return self._get_pandas_df(sql, schema=schema, hive_conf=hive_conf, **kwargs)

@property
def sqlalchemy_url(self) -> URL:
"""Return a `sqlalchemy.engine.URL` object constructed from the connection."""
conn = self.get_connection(self.get_conn_id())
extra = conn.extra_dejson or {}

query = {k: str(v) for k, v in extra.items() if v is not None and k != "__extra__"}

return URL.create(
drivername="hive",
username=conn.login,
password=conn.password,
host=conn.host,
port=conn.port,
database=conn.schema,
query=query,
)

def get_uri(self) -> str:
"""Return a SQLAlchemy engine URL as a string."""
return self.sqlalchemy_url.render_as_string(hide_password=False)
21 changes: 21 additions & 0 deletions providers/apache/hive/tests/unit/apache/hive/hooks/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,27 @@ def test_get_results_with_hive_conf(self):
assert f"test_{date_key}" in output
assert "test_dag_run_id" in output

def test_sqlalchemy_uri(self):
"""Test sqlalchemy_url with connection parameters"""

with mock.patch.object(HiveServer2Hook, "get_connection") as mock_get_conn:
mock_get_conn.return_value = Connection(
conn_id="test_hive_conn",
conn_type="hive_cli",
host="localhost",
port=10000,
schema="default",
login="admin",
password="admin",
)
hook = HiveServer2Hook()
uri = hook.sqlalchemy_url
assert uri.host == "localhost"
assert uri.port == 10000
assert uri.database == "default"
assert uri.username == "admin"
assert uri.password == "admin"


@pytest.mark.db_test
@mock.patch.dict("os.environ", AIRFLOW__CORE__SECURITY="kerberos")
Expand Down