Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mTLS support to WebHDFSHook #44561

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/apache-airflow-providers-apache-hdfs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ Extra (optional, connection parameters)

* ``use_ssl`` - If SSL should be used. By default is set to `false`.
* ``verify`` - How to verify SSL. For more information refer to https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification.
* ``cert`` - Client certificate path for mTLS, can be combined cert or used with ``key``
* ``key`` - Client key path for mTLS with ``cert``
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,8 @@ moto
mouseover
msg
mssql
mTLS
mtls
muldelete
Multinamespace
mutex
Expand Down
20 changes: 20 additions & 0 deletions providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,35 @@ def _find_valid_server(self) -> Any:
def _get_client(
self, namenode: str, port: int, login: str, password: str | None, schema: str, extra_dejson: dict
) -> Any:
"""
Get WebHDFS client.

Additional options via ``extra``:
- use_ssl: enable SSL connection (default: False)
- verify: CA certificate path or boolean for SSL verification (default: False)
- cert: client certificate path for mTLS, can be combined cert or used with ``key``
- key: client key path for mTLS with ``cert``
"""
connection_str = f"http://{namenode}"
session = requests.Session()

if password is not None:
session.auth = (login, password)

if extra_dejson.get("use_ssl", "False") == "True" or extra_dejson.get("use_ssl", False):
connection_str = f"https://{namenode}"
session.verify = extra_dejson.get("verify", False)

# Handle mTLS certificates
cert = extra_dejson.get("cert")
key = extra_dejson.get("key")

if cert:
if key:
session.cert = (cert, key)
else:
session.cert = cert

if port is not None:
connection_str += f":{port}"

Expand Down
48 changes: 48 additions & 0 deletions providers/tests/apache/hdfs/hooks/test_webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,51 @@ def test_conn_insecure_ssl_without_schema(self, socket_mock, mock_insecure_clien

assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0]
assert not mock_insecure_client.call_args.kwargs["session"].verify

@patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_mtls_cert_and_key(self, socket_mock, mock_insecure_client):
"""Test mTLS configuration with client cert and key"""
with patch(
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
return_value=Connection(
host="host_1",
port=123,
extra={
"use_ssl": "True",
"cert": "/path/to/cert.pem",
"key": "/path/to/key.pem",
},
),
) as mock_get_connection:
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value

assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0]
assert mock_insecure_client.call_args.kwargs["session"].cert == (
"/path/to/cert.pem",
"/path/to/key.pem",
)

@patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient")
@patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket")
def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client):
"""Test mTLS configuration with combined client cert and key"""
with patch(
"airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection",
return_value=Connection(
host="host_1",
port=123,
extra={
"use_ssl": "True",
"cert": "/path/to/combined.pem",
},
),
) as mock_get_connection:
socket_mock.socket.return_value.connect_ex.return_value = 0
self.webhdfs_hook.get_conn()
connection = mock_get_connection.return_value

assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0]
assert mock_insecure_client.call_args.kwargs["session"].cert == ("/path/to/combined.pem")