Skip to content

Commit

Permalink
Add mTLS support to WebHDFSHook (#44561)
Browse files Browse the repository at this point in the history
* WebHDFS Hook - mTLS Support

* Static check fixes
  • Loading branch information
markhatch authored Dec 2, 2024
1 parent aac8098 commit 5b898ad
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/apache-airflow-providers-apache-hdfs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ Extra (optional, connection parameters)

* ``use_ssl`` - If SSL should be used. By default is set to `false`.
* ``verify`` - How to verify SSL. For more information refer to https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification.
* ``cert`` - Client certificate path for mTLS, can be combined cert or used with ``key``
* ``key`` - Client key path for mTLS with ``cert``
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,8 @@ moto
mouseover
msg
mssql
mTLS
mtls
muldelete
Multinamespace
mutex
Expand Down
20 changes: 20 additions & 0 deletions providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,35 @@ def _find_valid_server(self) -> Any:
def _get_client(
self, namenode: str, port: int, login: str, password: str | None, schema: str, extra_dejson: dict
) -> Any:
"""
Get WebHDFS client.
Additional options via ``extra``:
- use_ssl: enable SSL connection (default: False)
- verify: CA certificate path or boolean for SSL verification (default: False)
- cert: client certificate path for mTLS, can be combined cert or used with ``key``
- key: client key path for mTLS with ``cert``
"""
connection_str = f"http://{namenode}"
session = requests.Session()

if password is not None:
session.auth = (login, password)

if extra_dejson.get("use_ssl", "False") == "True" or extra_dejson.get("use_ssl", False):
connection_str = f"https://{namenode}"
session.verify = extra_dejson.get("verify", False)

# Handle mTLS certificates
cert = extra_dejson.get("cert")
key = extra_dejson.get("key")

if cert:
if key:
session.cert = (cert, key)
else:
session.cert = cert

if port is not None:
connection_str += f":{port}"

Expand Down
48 changes: 48 additions & 0 deletions providers/tests/apache/hdfs/hooks/test_webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,51 @@ def test_conn_insecure_ssl_without_schema(self, socket_mock, mock_insecure_clien

assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0]
assert not mock_insecure_client.call_args.kwargs["session"].verify

@patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_mtls_cert_and_key(self, socket_mock, mock_insecure_client):
"""Test mTLS configuration with client cert and key"""
with patch(
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
return_value=Connection(
host="host_1",
port=123,
extra={
"use_ssl": "True",
"cert": "/path/to/cert.pem",
"key": "/path/to/key.pem",
},
),
) as mock_get_connection:
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value

assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0]
assert mock_insecure_client.call_args.kwargs["session"].cert == (
"/path/to/cert.pem",
"/path/to/key.pem",
)

@patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client):
"""Test mTLS configuration with combined client cert and key"""
with patch(
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
return_value=Connection(
host="host_1",
port=123,
extra={
"use_ssl": "True",
"cert": "/path/to/combined.pem",
},
),
) as mock_get_connection:
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value

assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0]
assert mock_insecure_client.call_args.kwargs["session"].cert == ("/path/to/combined.pem")

0 comments on commit 5b898ad

Please sign in to comment.