Skip to content

Commit

Permalink
Fix SFTPSensor.newer_than not working with jinja logical ds/ts expres…
Browse files Browse the repository at this point in the history
…sion (apache#39056)

* Fixes apache#36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* Fix formatting

* Fix formatting

* Fixes apache#36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* update simple-salesforce type hints to support 1.12.6 (apache#39047)

* Fix formatting

* Add changelog for airflow python client 2.9.0 (apache#39060)

* Upgrade to latest hatchling as build dependency (apache#39044)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (apache#38995) (apache#39054)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (apache#38995)

* update databricks

* [docs] update `DagBag` class docstring to include all params (apache#38814)

* update docstring for DagBag class

* break long line

* fix space

Signed-off-by: kalyanr <kalyan.ben10@live.com>

---------

Signed-off-by: kalyanr <kalyan.ben10@live.com>

* Data aware scheduling docs edits (apache#38687)

* Moves airflow import in deprecated pod_generator to local (apache#39062)

The import might be invoked when K8S executor starts with sentry on
and it might lead to circular imports

Related: apache#31442

* KPO xcom sidecar PodDefault usage (apache#38951)

We should use the same, non deprecated, version of PodDefaults for the
xcom sidecar when creating and reading xcom.

* Fix formatting

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Add examples in AWS auth manager documentation (apache#39040)

* update document (apache#39068)

* Update hatchling to version 1.24.0 (apache#39072)

* Check that the dataset<>task exists before trying to render graph (apache#39069)

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Fix utc timezone in unit tests

* Fix utc timezone in unit tests

---------

Signed-off-by: kalyanr <kalyan.ben10@live.com>
Co-authored-by: Grégoire Rolland <gregoire.rolland@mymoneybank.com>
Co-authored-by: Hussein Awala <hussein@awala.fr>
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com>
Co-authored-by: Kalyan <kalyan.ben10@live.com>
Co-authored-by: Laura Zdanski <25642903+lzdanski@users.noreply.github.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com>
Co-authored-by: humit <jhjang1005@naver.com>
Co-authored-by: Brent Bovenzi <brent@astronomer.io>
  • Loading branch information
12 people authored and pateash committed May 13, 2024
1 parent 25f8017 commit 105979e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
8 changes: 5 additions & 3 deletions airflow/providers/sftp/sensors/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.triggers.sftp import SFTPTrigger
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
from airflow.utils.timezone import convert_to_utc
from airflow.utils.timezone import convert_to_utc, parse

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand All @@ -57,7 +57,7 @@ def __init__(
*,
path: str,
file_pattern: str = "",
newer_than: datetime | None = None,
newer_than: datetime | str | None = None,
sftp_conn_id: str = "sftp_default",
python_callable: Callable | None = None,
op_args: list | None = None,
Expand All @@ -70,7 +70,7 @@ def __init__(
self.file_pattern = file_pattern
self.hook: SFTPHook | None = None
self.sftp_conn_id = sftp_conn_id
self.newer_than: datetime | None = newer_than
self.newer_than: datetime | str | None = newer_than
self.python_callable: Callable | None = python_callable
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
Expand Down Expand Up @@ -105,6 +105,8 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
continue

if self.newer_than:
if isinstance(self.newer_than, str):
self.newer_than = parse(self.newer_than)
_mod_time = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S"))
_newer_than = convert_to_utc(self.newer_than)
if _newer_than <= _mod_time:
Expand Down
20 changes: 17 additions & 3 deletions tests/providers/sftp/sensors/test_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from datetime import datetime, timezone as stdlib_timezone
from unittest import mock
from unittest.mock import Mock, call, patch

Expand Down Expand Up @@ -97,11 +97,25 @@ def test_file_not_new_enough(self, sftp_hook_mock):
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
assert not output

@pytest.mark.parametrize(
"newer_than",
(
datetime(2020, 1, 2),
datetime(2020, 1, 2, tzinfo=stdlib_timezone.utc),
"2020-01-02",
"2020-01-02 00:00:00+00:00",
"2020-01-02 00:00:00.001+00:00",
"2020-01-02T00:00:00+00:00",
"2020-01-02T00:00:00Z",
"2020-01-02T00:00:00+04:00",
"2020-01-02T00:00:00.000001+04:00",
),
)
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_naive_datetime(self, sftp_hook_mock):
def test_multiple_datetime_format_in_newer_than(self, sftp_hook_mock, newer_than):
sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000"
sftp_sensor = SFTPSensor(
task_id="unit_test", path="/path/to/file/1970-01-01.txt", newer_than=datetime(2020, 1, 2)
task_id="unit_test", path="/path/to/file/1970-01-01.txt", newer_than=newer_than
)
context = {"ds": "1970-01-00"}
output = sftp_sensor.poke(context)
Expand Down

0 comments on commit 105979e

Please sign in to comment.