Skip to content
Merged
28 changes: 14 additions & 14 deletions providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,25 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
if self.file_pattern:
files_from_pattern = self.hook.get_files_by_pattern(self.path, self.file_pattern)
if files_from_pattern:
actual_files_to_check = [
actual_files_present = [
os.path.join(self.path, file_from_pattern) for file_from_pattern in files_from_pattern
]
else:
return False
else:
try:
self.hook.isfile(self.path)
actual_files_to_check = [self.path]
except OSError as e:
if e.errno != SFTP_NO_SUCH_FILE:
raise AirflowException from e
actual_files_to_check = []
# If a file is present, it is the single element added to the actual_files_present list to be
# processed. If the file is a directory, actual_file_present will be assigned an empty list,
# since SFTPHook.isfile(...) returns False
actual_files_present = [self.path] if self.hook.isfile(self.path) else []
except Exception as e:
raise AirflowException from e

if self.newer_than:
for actual_file_to_check in actual_files_to_check:
for actual_file_present in actual_files_present:
try:
mod_time = self.hook.get_mod_time(actual_file_to_check)
self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time)
mod_time = self.hook.get_mod_time(actual_file_present)
self.log.info("Found File %s last modified: %s", actual_file_present, mod_time)
except OSError as e:
if e.errno != SFTP_NO_SUCH_FILE:
raise AirflowException from e
Expand All @@ -118,22 +118,22 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
_mod_time = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S"))
_newer_than = convert_to_utc(self.newer_than)
if _newer_than <= _mod_time:
files_found.append(actual_file_to_check)
files_found.append(actual_file_present)
self.log.info(
"File %s has modification time: '%s', which is newer than: '%s'",
actual_file_to_check,
actual_file_present,
str(_mod_time),
str(_newer_than),
)
else:
self.log.info(
"File %s has modification time: '%s', which is older than: '%s'",
actual_file_to_check,
actual_file_present,
str(_mod_time),
str(_newer_than),
)
else:
files_found = actual_files_to_check
files_found = actual_files_present

if not len(files_found):
return False
Expand Down
17 changes: 15 additions & 2 deletions providers/sftp/tests/unit/sftp/sensors/test_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from unittest.mock import Mock, patch

import pytest
from paramiko.sftp import SFTP_FAILURE, SFTP_NO_SUCH_FILE
from paramiko.sftp import SFTP_FAILURE
from pendulum import datetime as pendulum_datetime, timezone

from airflow.exceptions import AirflowException
Expand All @@ -44,9 +44,22 @@ def test_file_present(self, sftp_hook_mock):
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert output

@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_irregular(self, sftp_hook_mock):
# This mocks the behavior of SFTPHook.isfile when an OSError is raised in that method, resulting in
# False being returned
sftp_hook_mock.return_value.isfile.return_value = False
sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/1970-01-01.txt")
context = {"ds": "1970-01-01"}
output = sftp_sensor.poke(context)
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert not output

@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_absent(self, sftp_hook_mock):
sftp_hook_mock.return_value.isfile.side_effect = OSError(SFTP_NO_SUCH_FILE, "File missing")
# This is the same implementation above, however, it's simulating instead the absence of a file
sftp_hook_mock.return_value.isfile.return_value = False
sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/1970-01-01.txt")
context = {"ds": "1970-01-01"}
output = sftp_sensor.poke(context)
Expand Down
Loading