From 8b1d634fa18b2ce31d98024f8477e93c815fafbe Mon Sep 17 00:00:00 2001 From: Mark Hatch <06.swivel-robots@icloud.com> Date: Fri, 29 Nov 2024 11:21:24 +0800 Subject: [PATCH 1/2] WebHDFS Hook - mTLS Support --- .../connections.rst | 2 + .../providers/apache/hdfs/hooks/webhdfs.py | 20 ++++++++ .../tests/apache/hdfs/hooks/test_webhdfs.py | 47 +++++++++++++++++++ 3 files changed, 69 insertions(+) diff --git a/docs/apache-airflow-providers-apache-hdfs/connections.rst b/docs/apache-airflow-providers-apache-hdfs/connections.rst index c67331aaedfd3..28b557c46378c 100644 --- a/docs/apache-airflow-providers-apache-hdfs/connections.rst +++ b/docs/apache-airflow-providers-apache-hdfs/connections.rst @@ -43,3 +43,5 @@ Extra (optional, connection parameters) * ``use_ssl`` - If SSL should be used. By default is set to `false`. * ``verify`` - How to verify SSL. For more information refer to https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification. + * ``cert`` - Client certificate path for mTLS, can be combined cert or used with ``key`` + * ``key`` - Client key path for mTLS with ``cert`` diff --git a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py index 3a996dddc77c6..47ab97db4e8df 100644 --- a/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py +++ b/providers/src/airflow/providers/apache/hdfs/hooks/webhdfs.py @@ -105,8 +105,18 @@ def _find_valid_server(self) -> Any: def _get_client( self, namenode: str, port: int, login: str, password: str | None, schema: str, extra_dejson: dict ) -> Any: + """ + Get WebHDFS client. + + Additional options via ``extra``: + - use_ssl: enable SSL connection (default: False) + - verify: CA certificate path or boolean for SSL verification (default: False) + - cert: client certificate path for mTLS, can be combined cert or used with ``key`` + - key: client key path for mTLS with ``cert`` + """ connection_str = f"http://{namenode}" session = requests.Session() + if password is not None: session.auth = (login, password) @@ -114,6 +124,16 @@ def _get_client( connection_str = f"https://{namenode}" session.verify = extra_dejson.get("verify", False) + # Handle mTLS certificates + cert = extra_dejson.get("cert") + key = extra_dejson.get("key") + + if cert: + if key: + session.cert = (cert, key) + else: + session.cert = cert + if port is not None: connection_str += f":{port}" diff --git a/providers/tests/apache/hdfs/hooks/test_webhdfs.py b/providers/tests/apache/hdfs/hooks/test_webhdfs.py index 2e19b6401b118..25f608543e569 100644 --- a/providers/tests/apache/hdfs/hooks/test_webhdfs.py +++ b/providers/tests/apache/hdfs/hooks/test_webhdfs.py @@ -261,3 +261,50 @@ def test_conn_insecure_ssl_without_schema(self, socket_mock, mock_insecure_clien assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] assert not mock_insecure_client.call_args.kwargs["session"].verify + + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient") + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") + def test_conn_mtls_cert_and_key(self, socket_mock, mock_insecure_client): + """Test mTLS configuration with client cert and key""" + with patch( + "airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection", + return_value=Connection( + host="host_1", + port=123, + extra={ + "use_ssl": "True", + "cert": "/path/to/cert.pem", + "key": "/path/to/key.pem", + }, + ), + ) as mock_get_connection: + socket_mock.socket.return_value.connect_ex.return_value = 0 + self.webhdfs_hook.get_conn() + connection = mock_get_connection.return_value + + assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] + assert ("/path/to/cert.pem", "/path/to/key.pem") == mock_insecure_client.call_args.kwargs[ + "session" + ].cert + + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient") + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") + def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client): + """Test mTLS configuration with combined client cert and key""" + with patch( + "airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection", + return_value=Connection( + host="host_1", + port=123, + extra={ + "use_ssl": "True", + "cert": "/path/to/combined.pem", + }, + ), + ) as mock_get_connection: + socket_mock.socket.return_value.connect_ex.return_value = 0 + self.webhdfs_hook.get_conn() + connection = mock_get_connection.return_value + + assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] + assert ("/path/to/combined.pem") == mock_insecure_client.call_args.kwargs["session"].cert From 56996c3a8e29d7b1fa97f0b134a8513f65e1bd67 Mon Sep 17 00:00:00 2001 From: Mark Hatch <06.swivel-robots@icloud.com> Date: Mon, 2 Dec 2024 19:03:49 +0800 Subject: [PATCH 2/2] Static check fixes --- docs/spelling_wordlist.txt | 2 ++ providers/tests/apache/hdfs/hooks/test_webhdfs.py | 9 +++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index debbd43c896e4..e02e6cee380e1 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1058,6 +1058,8 @@ moto mouseover msg mssql +mTLS +mtls muldelete Multinamespace mutex diff --git a/providers/tests/apache/hdfs/hooks/test_webhdfs.py b/providers/tests/apache/hdfs/hooks/test_webhdfs.py index 0a83aa7479816..80b4cef76cad9 100644 --- a/providers/tests/apache/hdfs/hooks/test_webhdfs.py +++ b/providers/tests/apache/hdfs/hooks/test_webhdfs.py @@ -283,9 +283,10 @@ def test_conn_mtls_cert_and_key(self, socket_mock, mock_insecure_client): connection = mock_get_connection.return_value assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] - assert ("/path/to/cert.pem", "/path/to/key.pem") == mock_insecure_client.call_args.kwargs[ - "session" - ].cert + assert mock_insecure_client.call_args.kwargs["session"].cert == ( + "/path/to/cert.pem", + "/path/to/key.pem", + ) @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient") @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") @@ -307,4 +308,4 @@ def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client): connection = mock_get_connection.return_value assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] - assert ("/path/to/combined.pem") == mock_insecure_client.call_args.kwargs["session"].cert + assert mock_insecure_client.call_args.kwargs["session"].cert == ("/path/to/combined.pem")