Skip to content

Commit

Permalink
Fixed failing static checks & provider tests (#43122)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Oct 17, 2024
1 parent 2576eea commit 571cf09
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
19 changes: 13 additions & 6 deletions providers/src/airflow/providers/apache/hdfs/sensors/web_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Sequence, Union
from typing import TYPE_CHECKING, Any, Sequence

from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
from hdfs.ext.kerberos import KerberosClient
from hdfs import InsecureClient
from hdfs.ext.kerberos import KerberosClient

from airflow.utils.context import Context


class WebHdfsSensor(BaseSensorOperator):
Expand All @@ -50,8 +51,14 @@ class MultipleFilesWebHdfsSensor(BaseSensorOperator):

template_fields: Sequence[str] = ("directory_path", "expected_filenames")

def __init__(self, *, directory_path: str, expected_filenames: Sequence[str],
webhdfs_conn_id: str = "webhdfs_default", **kwargs: Any) -> None:
def __init__(
self,
*,
directory_path: str,
expected_filenames: Sequence[str],
webhdfs_conn_id: str = "webhdfs_default",
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.directory_path = directory_path
self.expected_filenames = expected_filenames
Expand All @@ -61,7 +68,7 @@ def poke(self, context: Context) -> bool:
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

hook = WebHDFSHook(self.webhdfs_conn_id)
conn: 'KerberosClient | InsecureClient' = hook.get_conn()
conn: KerberosClient | InsecureClient = hook.get_conn()

actual_files = set(conn.list(self.directory_path))
self.log.debug("Files Found in directory: %s", actual_files)
Expand Down
11 changes: 6 additions & 5 deletions providers/tests/apache/hdfs/sensors/test_web_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os
from unittest import mock

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor, MultipleFilesWebHdfsSensor
from airflow.providers.apache.hdfs.sensors.web_hdfs import MultipleFilesWebHdfsSensor, WebHdfsSensor

TEST_HDFS_CONN = "webhdfs_default"
TEST_HDFS_DIRECTORY = "hdfs://user/hive/warehouse/airflow.db"
Expand Down Expand Up @@ -69,9 +69,11 @@ def test_poke(self, mock_hook, caplog):
task_id="test_task",
webhdfs_conn_id=TEST_HDFS_CONN,
directory_path=TEST_HDFS_DIRECTORY,
expected_filenames=TEST_HDFS_FILENAMES
expected_filenames=TEST_HDFS_FILENAMES,
)
result = sensor.poke(dict())

with caplog.at_level("DEBUG", logger="airflow.task"):
result = sensor.poke(dict())

assert result
assert "Files Found in directory: " in caplog.text
Expand All @@ -88,12 +90,11 @@ def test_poke_should_return_false_for_missing_file(self, mock_hook, caplog):
task_id="test_task",
webhdfs_conn_id=TEST_HDFS_CONN,
directory_path=TEST_HDFS_DIRECTORY,
expected_filenames=TEST_HDFS_FILENAMES
expected_filenames=TEST_HDFS_FILENAMES,
)
exists = sensor.poke(dict())

assert not exists
assert "Files Found in directory: " in caplog.text
assert "There are missing files: " in caplog.text

mock_hook.return_value.get_conn.return_value.list.assert_called_once_with(TEST_HDFS_DIRECTORY)
Expand Down

0 comments on commit 571cf09

Please sign in to comment.