Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed failing static checks & provider tests #43122

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions providers/src/airflow/providers/apache/hdfs/sensors/web_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Sequence, Union
from typing import TYPE_CHECKING, Any, Sequence

from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
from hdfs.ext.kerberos import KerberosClient
from hdfs import InsecureClient
from hdfs.ext.kerberos import KerberosClient

from airflow.utils.context import Context


class WebHdfsSensor(BaseSensorOperator):
Expand All @@ -50,8 +51,14 @@ class MultipleFilesWebHdfsSensor(BaseSensorOperator):

template_fields: Sequence[str] = ("directory_path", "expected_filenames")

def __init__(self, *, directory_path: str, expected_filenames: Sequence[str],
webhdfs_conn_id: str = "webhdfs_default", **kwargs: Any) -> None:
def __init__(
self,
*,
directory_path: str,
expected_filenames: Sequence[str],
webhdfs_conn_id: str = "webhdfs_default",
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.directory_path = directory_path
self.expected_filenames = expected_filenames
Expand All @@ -61,7 +68,7 @@ def poke(self, context: Context) -> bool:
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook

hook = WebHDFSHook(self.webhdfs_conn_id)
conn: 'KerberosClient | InsecureClient' = hook.get_conn()
conn: KerberosClient | InsecureClient = hook.get_conn()

actual_files = set(conn.list(self.directory_path))
self.log.debug("Files Found in directory: %s", actual_files)
Expand Down
11 changes: 6 additions & 5 deletions providers/tests/apache/hdfs/sensors/test_web_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os
from unittest import mock

from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor, MultipleFilesWebHdfsSensor
from airflow.providers.apache.hdfs.sensors.web_hdfs import MultipleFilesWebHdfsSensor, WebHdfsSensor

TEST_HDFS_CONN = "webhdfs_default"
TEST_HDFS_DIRECTORY = "hdfs://user/hive/warehouse/airflow.db"
Expand Down Expand Up @@ -69,9 +69,11 @@ def test_poke(self, mock_hook, caplog):
task_id="test_task",
webhdfs_conn_id=TEST_HDFS_CONN,
directory_path=TEST_HDFS_DIRECTORY,
expected_filenames=TEST_HDFS_FILENAMES
expected_filenames=TEST_HDFS_FILENAMES,
)
result = sensor.poke(dict())

with caplog.at_level("DEBUG", logger="airflow.task"):
result = sensor.poke(dict())

assert result
assert "Files Found in directory: " in caplog.text
Expand All @@ -88,12 +90,11 @@ def test_poke_should_return_false_for_missing_file(self, mock_hook, caplog):
task_id="test_task",
webhdfs_conn_id=TEST_HDFS_CONN,
directory_path=TEST_HDFS_DIRECTORY,
expected_filenames=TEST_HDFS_FILENAMES
expected_filenames=TEST_HDFS_FILENAMES,
)
exists = sensor.poke(dict())

assert not exists
assert "Files Found in directory: " in caplog.text
assert "There are missing files: " in caplog.text

mock_hook.return_value.get_conn.return_value.list.assert_called_once_with(TEST_HDFS_DIRECTORY)
Expand Down
Loading