diff --git a/providers/src/airflow/providers/apache/hdfs/sensors/web_hdfs.py b/providers/src/airflow/providers/apache/hdfs/sensors/web_hdfs.py index d82da2a15697b..14f622e6efedc 100644 --- a/providers/src/airflow/providers/apache/hdfs/sensors/web_hdfs.py +++ b/providers/src/airflow/providers/apache/hdfs/sensors/web_hdfs.py @@ -17,14 +17,15 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, Sequence, Union +from typing import TYPE_CHECKING, Any, Sequence from airflow.sensors.base import BaseSensorOperator if TYPE_CHECKING: - from airflow.utils.context import Context - from hdfs.ext.kerberos import KerberosClient from hdfs import InsecureClient + from hdfs.ext.kerberos import KerberosClient + + from airflow.utils.context import Context class WebHdfsSensor(BaseSensorOperator): @@ -50,8 +51,14 @@ class MultipleFilesWebHdfsSensor(BaseSensorOperator): template_fields: Sequence[str] = ("directory_path", "expected_filenames") - def __init__(self, *, directory_path: str, expected_filenames: Sequence[str], - webhdfs_conn_id: str = "webhdfs_default", **kwargs: Any) -> None: + def __init__( + self, + *, + directory_path: str, + expected_filenames: Sequence[str], + webhdfs_conn_id: str = "webhdfs_default", + **kwargs: Any, + ) -> None: super().__init__(**kwargs) self.directory_path = directory_path self.expected_filenames = expected_filenames @@ -61,7 +68,7 @@ def poke(self, context: Context) -> bool: from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook hook = WebHDFSHook(self.webhdfs_conn_id) - conn: 'KerberosClient | InsecureClient' = hook.get_conn() + conn: KerberosClient | InsecureClient = hook.get_conn() actual_files = set(conn.list(self.directory_path)) self.log.debug("Files Found in directory: %s", actual_files) diff --git a/providers/tests/apache/hdfs/sensors/test_web_hdfs.py b/providers/tests/apache/hdfs/sensors/test_web_hdfs.py index 4984140092214..41ccb88e28e18 100644 --- a/providers/tests/apache/hdfs/sensors/test_web_hdfs.py +++ b/providers/tests/apache/hdfs/sensors/test_web_hdfs.py @@ -20,7 +20,7 @@ import os from unittest import mock -from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor, MultipleFilesWebHdfsSensor +from airflow.providers.apache.hdfs.sensors.web_hdfs import MultipleFilesWebHdfsSensor, WebHdfsSensor TEST_HDFS_CONN = "webhdfs_default" TEST_HDFS_DIRECTORY = "hdfs://user/hive/warehouse/airflow.db" @@ -69,9 +69,11 @@ def test_poke(self, mock_hook, caplog): task_id="test_task", webhdfs_conn_id=TEST_HDFS_CONN, directory_path=TEST_HDFS_DIRECTORY, - expected_filenames=TEST_HDFS_FILENAMES + expected_filenames=TEST_HDFS_FILENAMES, ) - result = sensor.poke(dict()) + + with caplog.at_level("DEBUG", logger="airflow.task"): + result = sensor.poke(dict()) assert result assert "Files Found in directory: " in caplog.text @@ -88,12 +90,11 @@ def test_poke_should_return_false_for_missing_file(self, mock_hook, caplog): task_id="test_task", webhdfs_conn_id=TEST_HDFS_CONN, directory_path=TEST_HDFS_DIRECTORY, - expected_filenames=TEST_HDFS_FILENAMES + expected_filenames=TEST_HDFS_FILENAMES, ) exists = sensor.poke(dict()) assert not exists - assert "Files Found in directory: " in caplog.text assert "There are missing files: " in caplog.text mock_hook.return_value.get_conn.return_value.list.assert_called_once_with(TEST_HDFS_DIRECTORY)