diff --git a/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py b/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py index df4c7c5a6110d..98a47e22539b2 100644 --- a/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py +++ b/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py @@ -89,25 +89,25 @@ def poke(self, context: Context) -> PokeReturnValue | bool: if self.file_pattern: files_from_pattern = self.hook.get_files_by_pattern(self.path, self.file_pattern) if files_from_pattern: - actual_files_to_check = [ + actual_files_present = [ os.path.join(self.path, file_from_pattern) for file_from_pattern in files_from_pattern ] else: return False else: try: - self.hook.isfile(self.path) - actual_files_to_check = [self.path] - except OSError as e: - if e.errno != SFTP_NO_SUCH_FILE: - raise AirflowException from e - actual_files_to_check = [] + # If a file is present, it is the single element added to the actual_files_present list to be + # processed. If the file is a directory, actual_file_present will be assigned an empty list, + # since SFTPHook.isfile(...) returns False + actual_files_present = [self.path] if self.hook.isfile(self.path) else [] + except Exception as e: + raise AirflowException from e if self.newer_than: - for actual_file_to_check in actual_files_to_check: + for actual_file_present in actual_files_present: try: - mod_time = self.hook.get_mod_time(actual_file_to_check) - self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time) + mod_time = self.hook.get_mod_time(actual_file_present) + self.log.info("Found File %s last modified: %s", actual_file_present, mod_time) except OSError as e: if e.errno != SFTP_NO_SUCH_FILE: raise AirflowException from e @@ -118,22 +118,22 @@ def poke(self, context: Context) -> PokeReturnValue | bool: _mod_time = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S")) _newer_than = convert_to_utc(self.newer_than) if _newer_than <= _mod_time: - files_found.append(actual_file_to_check) + files_found.append(actual_file_present) self.log.info( "File %s has modification time: '%s', which is newer than: '%s'", - actual_file_to_check, + actual_file_present, str(_mod_time), str(_newer_than), ) else: self.log.info( "File %s has modification time: '%s', which is older than: '%s'", - actual_file_to_check, + actual_file_present, str(_mod_time), str(_newer_than), ) else: - files_found = actual_files_to_check + files_found = actual_files_present if not len(files_found): return False diff --git a/providers/sftp/tests/unit/sftp/sensors/test_sftp.py b/providers/sftp/tests/unit/sftp/sensors/test_sftp.py index 644b876c36783..2409355ead2a2 100644 --- a/providers/sftp/tests/unit/sftp/sensors/test_sftp.py +++ b/providers/sftp/tests/unit/sftp/sensors/test_sftp.py @@ -22,7 +22,7 @@ from unittest.mock import Mock, patch import pytest -from paramiko.sftp import SFTP_FAILURE, SFTP_NO_SUCH_FILE +from paramiko.sftp import SFTP_FAILURE from pendulum import datetime as pendulum_datetime, timezone from airflow.exceptions import AirflowException @@ -44,9 +44,22 @@ def test_file_present(self, sftp_hook_mock): sftp_hook_mock.return_value.close_conn.assert_not_called() assert output + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") + def test_file_irregular(self, sftp_hook_mock): + # This mocks the behavior of SFTPHook.isfile when an OSError is raised in that method, resulting in + # False being returned + sftp_hook_mock.return_value.isfile.return_value = False + sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/1970-01-01.txt") + context = {"ds": "1970-01-01"} + output = sftp_sensor.poke(context) + sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt") + sftp_hook_mock.return_value.close_conn.assert_not_called() + assert not output + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") def test_file_absent(self, sftp_hook_mock): - sftp_hook_mock.return_value.isfile.side_effect = OSError(SFTP_NO_SUCH_FILE, "File missing") + # This is the same implementation above, however, it's simulating instead the absence of a file + sftp_hook_mock.return_value.isfile.return_value = False sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/1970-01-01.txt") context = {"ds": "1970-01-01"} output = sftp_sensor.poke(context)