diff --git a/docs/apache-airflow-providers-apache-hdfs/connections.rst b/docs/apache-airflow-providers-apache-hdfs/connections.rst index c67331aaedfd3..28b557c46378c 100644 --- a/docs/apache-airflow-providers-apache-hdfs/connections.rst +++ b/docs/apache-airflow-providers-apache-hdfs/connections.rst @@ -43,3 +43,5 @@ Extra (optional, connection parameters) * ``use_ssl`` - If SSL should be used. By default is set to `false`. * ``verify`` - How to verify SSL. For more information refer to https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification. + * ``cert`` - Client certificate path for mTLS, can be combined cert or used with ``key`` + * ``key`` - Client key path for mTLS with ``cert`` diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index debbd43c896e4..e02e6cee380e1 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1058,6 +1058,8 @@ moto mouseover msg mssql +mTLS +mtls muldelete Multinamespace mutex diff --git a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py index 3a996dddc77c6..47ab97db4e8df 100644 --- a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py +++ b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py @@ -105,8 +105,18 @@ def _find_valid_server(self) -> Any: def _get_client( self, namenode: str, port: int, login: str, password: str | None, schema: str, extra_dejson: dict ) -> Any: + """ + Get WebHDFS client. + + Additional options via ``extra``: + - use_ssl: enable SSL connection (default: False) + - verify: CA certificate path or boolean for SSL verification (default: False) + - cert: client certificate path for mTLS, can be combined cert or used with ``key`` + - key: client key path for mTLS with ``cert`` + """ connection_str = f"http://{namenode}" session = requests.Session() + if password is not None: session.auth = (login, password) @@ -114,6 +124,16 @@ def _get_client( connection_str = f"https://{namenode}" session.verify = extra_dejson.get("verify", False) + # Handle mTLS certificates + cert = extra_dejson.get("cert") + key = extra_dejson.get("key") + + if cert: + if key: + session.cert = (cert, key) + else: + session.cert = cert + if port is not None: connection_str += f":{port}" diff --git a/providers/tests/apache/hdfs/hooks/test_webhdfs.py b/providers/tests/apache/hdfs/hooks/test_webhdfs.py index 6eb32328cdccd..80b4cef76cad9 100644 --- a/providers/tests/apache/hdfs/hooks/test_webhdfs.py +++ b/providers/tests/apache/hdfs/hooks/test_webhdfs.py @@ -261,3 +261,51 @@ def test_conn_insecure_ssl_without_schema(self, socket_mock, mock_insecure_clien assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] assert not mock_insecure_client.call_args.kwargs["session"].verify + + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient") + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") + def test_conn_mtls_cert_and_key(self, socket_mock, mock_insecure_client): + """Test mTLS configuration with client cert and key""" + with patch( + "airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection", + return_value=Connection( + host="host_1", + port=123, + extra={ + "use_ssl": "True", + "cert": "/path/to/cert.pem", + "key": "/path/to/key.pem", + }, + ), + ) as mock_get_connection: + socket_mock.socket.return_value.connect_ex.return_value = 0 + self.webhdfs_hook.get_conn() + connection = mock_get_connection.return_value + + assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] + assert mock_insecure_client.call_args.kwargs["session"].cert == ( + "/path/to/cert.pem", + "/path/to/key.pem", + ) + + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient") + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") + def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client): + """Test mTLS configuration with combined client cert and key""" + with patch( + "airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection", + return_value=Connection( + host="host_1", + port=123, + extra={ + "use_ssl": "True", + "cert": "/path/to/combined.pem", + }, + ), + ) as mock_get_connection: + socket_mock.socket.return_value.connect_ex.return_value = 0 + self.webhdfs_hook.get_conn() + connection = mock_get_connection.return_value + + assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] + assert mock_insecure_client.call_args.kwargs["session"].cert == ("/path/to/combined.pem")