diff --git a/providers/apache/hdfs/docs/connections.rst b/providers/apache/hdfs/docs/connections.rst index 28b557c46378c..c54cd82997802 100644 --- a/providers/apache/hdfs/docs/connections.rst +++ b/providers/apache/hdfs/docs/connections.rst @@ -44,4 +44,6 @@ Extra (optional, connection parameters) * ``use_ssl`` - If SSL should be used. By default is set to `false`. * ``verify`` - How to verify SSL. For more information refer to https://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification. * ``cert`` - Client certificate path for mTLS, can be combined cert or used with ``key`` - * ``key`` - Client key path for mTLS with ``cert`` + * ``key`` - Client key path for mTLS with ``cert``. + * ``cookies`` - Add cookies to session. + * ``headers`` - Add headers to session. diff --git a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/hooks/webhdfs.py b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/hooks/webhdfs.py index 43f3fae6aa6ad..f84588df54fe5 100644 --- a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/hooks/webhdfs.py +++ b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/hooks/webhdfs.py @@ -133,6 +133,14 @@ def _get_client( else: session.cert = cert + cookies = extra_dejson.get("cookies", False) + if cookies: + session.cookies.update(cookies) + + headers = extra_dejson.get("headers", False) + if extra_dejson.get("headers", False): + session.headers.update(headers) + if port is not None: connection_str += f":{port}" diff --git a/providers/apache/hdfs/tests/unit/apache/hdfs/hooks/test_webhdfs.py b/providers/apache/hdfs/tests/unit/apache/hdfs/hooks/test_webhdfs.py index 80b4cef76cad9..16d4e81008f5a 100644 --- a/providers/apache/hdfs/tests/unit/apache/hdfs/hooks/test_webhdfs.py +++ b/providers/apache/hdfs/tests/unit/apache/hdfs/hooks/test_webhdfs.py @@ -309,3 +309,25 @@ def test_conn_mtls_combined_cert(self, socket_mock, mock_insecure_client): assert f"https://{connection.host}:{connection.port}" == mock_insecure_client.call_args.args[0] assert mock_insecure_client.call_args.kwargs["session"].cert == ("/path/to/combined.pem") + + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient") + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") + def test_conn_cookies(self, socket_mock, mock_insecure_client): + with patch( + "airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection", + return_value=Connection(host="host_1", port=123, extra={"cookies": {"my": "cookies"}}), + ): + socket_mock.socket.return_value.connect_ex.return_value = 0 + self.webhdfs_hook.get_conn() + assert mock_insecure_client.call_args.kwargs["session"].cookies.get("my") == "cookies" + + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.InsecureClient") + @patch("airflow.providers.apache.hdfs.hooks.webhdfs.socket") + def test_conn_headers(self, socket_mock, mock_insecure_client): + with patch( + "airflow.providers.apache.hdfs.hooks.webhdfs.WebHDFSHook.get_connection", + return_value=Connection(host="host_1", port=123, extra={"headers": {"my": "headers"}}), + ): + socket_mock.socket.return_value.connect_ex.return_value = 0 + self.webhdfs_hook.get_conn() + assert mock_insecure_client.call_args.kwargs["session"].headers.get("my") == "headers"