diff --git a/airflow/providers/sftp/decorators/__init__.py b/airflow/providers/sftp/decorators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/sftp/decorators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/sftp/decorators/sensors/__init__.py b/airflow/providers/sftp/decorators/sensors/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/sftp/decorators/sensors/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/sftp/decorators/sensors/sftp.py b/airflow/providers/sftp/decorators/sensors/sftp.py new file mode 100644 index 0000000000000..990ebb53c9e8b --- /dev/null +++ b/airflow/providers/sftp/decorators/sensors/sftp.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Callable, Sequence + +from airflow.decorators.base import TaskDecorator, get_unique_task_id, task_decorator_factory +from airflow.providers.sftp.sensors.sftp import SFTPSensor + + +class _DecoratedSFTPSensorOperator(SFTPSensor): + """ + Wraps a Python callable and captures args/kwargs when called for execution. + + :param python_callable: A reference to an object that is callable + :param task_id: task Id + :param op_args: a list of positional arguments that will get unpacked when + calling your callable (templated) + :param op_kwargs: a dictionary of keyword arguments that will get unpacked + in your function (templated) + :param kwargs_to_upstream: For certain operators, we might need to upstream certain arguments + that would otherwise be absorbed by the DecoratedOperator (for example python_callable for the + PythonOperator). This gives a user the option to upstream kwargs as needed. + """ + + template_fields: Sequence[str] = ("op_args", "op_kwargs", *SFTPSensor.template_fields) + + custom_operator_name = "@task.sftp_sensor" + + # since we won't mutate the arguments, we should just do the shallow copy + # there are some cases we can't deepcopy the objects (e.g protobuf). + shallow_copy_attrs: Sequence[str] = ("python_callable",) + + def __init__( + self, + *, + task_id: str, + **kwargs, + ) -> None: + kwargs.pop("multiple_outputs") + kwargs["task_id"] = get_unique_task_id(task_id, kwargs.get("dag"), kwargs.get("task_group")) + super().__init__(**kwargs) + + +def sftp_sensor_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: + """ + Wraps a function into an Airflow operator. + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + :param python_callable: Function to decorate + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=False, + decorated_operator_class=_DecoratedSFTPSensorOperator, + **kwargs, + ) diff --git a/airflow/providers/sftp/provider.yaml b/airflow/providers/sftp/provider.yaml index ec8adb1b7b687..84b8dd738fade 100644 --- a/airflow/providers/sftp/provider.yaml +++ b/airflow/providers/sftp/provider.yaml @@ -68,6 +68,7 @@ sensors: - integration-name: SSH File Transfer Protocol (SFTP) python-modules: - airflow.providers.sftp.sensors.sftp + - airflow.providers.sftp.decorators.sensors.sftp hooks: - integration-name: SSH File Transfer Protocol (SFTP) @@ -78,3 +79,7 @@ hooks: connection-types: - hook-class-name: airflow.providers.sftp.hooks.sftp.SFTPHook connection-type: sftp + +task-decorators: + - class-name: airflow.providers.sftp.decorators.sensors.sftp.sftp_sensor_task + name: sftp_sensor diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index 8a84ee2f14458..82baab2bce5ad 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -20,12 +20,12 @@ import os from datetime import datetime -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, Any, Callable, Sequence from paramiko.sftp import SFTP_NO_SUCH_FILE from airflow.providers.sftp.hooks.sftp import SFTPHook -from airflow.sensors.base import BaseSensorOperator +from airflow.sensors.base import BaseSensorOperator, PokeReturnValue from airflow.utils.timezone import convert_to_utc if TYPE_CHECKING: @@ -54,6 +54,9 @@ def __init__( file_pattern: str = "", newer_than: datetime | None = None, sftp_conn_id: str = "sftp_default", + python_callable: Callable | None = None, + op_args: list | None = None, + op_kwargs: dict[str, Any] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -62,10 +65,14 @@ def __init__( self.hook: SFTPHook | None = None self.sftp_conn_id = sftp_conn_id self.newer_than: datetime | None = newer_than + self.python_callable: Callable | None = python_callable + self.op_args = op_args or [] + self.op_kwargs = op_kwargs or {} - def poke(self, context: Context) -> bool: + def poke(self, context: Context) -> PokeReturnValue | bool: self.hook = SFTPHook(self.sftp_conn_id) self.log.info("Poking for %s, with pattern %s", self.path, self.file_pattern) + files_found = [] if self.file_pattern: files_from_pattern = self.hook.get_files_by_pattern(self.path, self.file_pattern) @@ -89,8 +96,18 @@ def poke(self, context: Context) -> bool: _mod_time = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S")) _newer_than = convert_to_utc(self.newer_than) if _newer_than <= _mod_time: - return True + files_found.append(actual_file_to_check) else: - return True + files_found.append(actual_file_to_check) self.hook.close_conn() - return False + if not len(files_found): + return False + if self.python_callable is not None: + if self.op_kwargs: + self.op_kwargs["files_found"] = files_found + callable_return = self.python_callable(*self.op_args, **self.op_kwargs) + return PokeReturnValue( + is_done=True, + xcom_value={"files_found": files_found, "decorator_return_value": callable_return}, + ) + return True diff --git a/docs/apache-airflow-providers-sftp/index.rst b/docs/apache-airflow-providers-sftp/index.rst index 83031f470f73a..72fbf2546b7e8 100644 --- a/docs/apache-airflow-providers-sftp/index.rst +++ b/docs/apache-airflow-providers-sftp/index.rst @@ -35,6 +35,7 @@ :caption: References Connection types + Sensors Python API <_api/airflow/providers/sftp/index> .. toctree:: @@ -45,6 +46,12 @@ PyPI Repository Installing from sources +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/providers/sftp/index> .. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! diff --git a/docs/apache-airflow-providers-sftp/sensors/sftp_sensor.rst b/docs/apache-airflow-providers-sftp/sensors/sftp_sensor.rst new file mode 100644 index 0000000000000..70294974f5b32 --- /dev/null +++ b/docs/apache-airflow-providers-sftp/sensors/sftp_sensor.rst @@ -0,0 +1,46 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +SFTP Sensor +=========== + +Looks for either a specific file or files with a specific pattern in a server using SFTP protocol. +To get more information about this sensor visit :class:`~airflow.providers.sftp.sensors.sftp.SFTPSensor` + +.. exampleinclude:: /../../tests/system/providers/sftp/example_sftp_sensor.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_sftp_sensor] + :end-before: [END howto_operator_sftp_sensor] + + +We can also use Taskflow API. It takes the same arguments as the :class:`~airflow.providers.sftp.sensors.sftp.SFTPSensor` along with - + +op_args (optional) + A list of positional arguments that will get unpacked when + calling your callable (templated) +op_kwargs (optional) + A dictionary of keyword arguments that will get unpacked + in your function (templated) + +Whatever returned by the python callable is put into XCom. + +.. exampleinclude:: /../../tests/system/providers/sftp/example_sftp_sensor.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_sftp_sensor_decorator] + :end-before: [END howto_operator_sftp_sensor_decorator] diff --git a/tests/providers/sftp/decorators/__init__.py b/tests/providers/sftp/decorators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/sftp/decorators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/sftp/decorators/sensors/__init__.py b/tests/providers/sftp/decorators/sensors/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/sftp/decorators/sensors/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/sftp/decorators/sensors/test_sftp.py b/tests/providers/sftp/decorators/sensors/test_sftp.py new file mode 100644 index 0000000000000..c79cd8ed3dc67 --- /dev/null +++ b/tests/providers/sftp/decorators/sensors/test_sftp.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from airflow.decorators import task +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2021, 9, 9) + + +class TestSFTPDecoratorSensor: + @pytest.mark.parametrize( + "file_path,", + ["/path/to/file/2021-09-09.txt", "/path/to/file/{{ ds }}.txt"], + ) + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") + def test_decorator_with_file_path_with_template(self, sftp_hook_mock, file_path, dag_maker): + sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" + file_path_templated = file_path + file_path = "/path/to/file/2021-09-09.txt" + decorated_func_return = "decorated_func_returns" + expected_xcom_return = {"files_found": [file_path], "decorator_return_value": decorated_func_return} + + @task.sftp_sensor(path=file_path_templated) + def f(): + return decorated_func_return + + with dag_maker(): + ret = f() + + dr = dag_maker.create_dagrun() + ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) + ti = dr.get_task_instances()[0] + assert ti.xcom_pull() == expected_xcom_return + + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") + def test_decorator_with_file_path_with_args(self, sftp_hook_mock, dag_maker): + sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" + file_path = "/path/to/file/1970-01-01.txt" + op_args = ["op_args_1"] + op_kwargs = {"key": "value"} + decorated_func_return = {"args": op_args, "kwargs": {**op_kwargs, "files_found": [file_path]}} + expected_xcom_return = {"files_found": [file_path], "decorator_return_value": decorated_func_return} + + @task.sftp_sensor(path=file_path) + def f(*args, **kwargs): + return {"args": args, "kwargs": kwargs} + + with dag_maker(): + ret = f(*op_args, **op_kwargs) + + dr = dag_maker.create_dagrun() + ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) + ti = dr.get_task_instances()[0] + assert ti.xcom_pull() == expected_xcom_return + + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") + def test_decorator_with_file_pattern(self, sftp_hook_mock, dag_maker): + sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" + file_path_list = ["/path/to/file/text_file.txt", "/path/to/file/another_text_file.txt"] + sftp_hook_mock.return_value.get_files_by_pattern.return_value = [ + "text_file.txt", + "another_text_file.txt", + ] + decorated_func_return = "decorated_func_returns" + expected_xcom_return = { + "files_found": file_path_list, + "decorator_return_value": decorated_func_return, + } + + @task.sftp_sensor(path="/path/to/file/", file_pattern=".txt") + def f(): + return decorated_func_return + + with dag_maker(): + ret = f() + + dr = dag_maker.create_dagrun() + ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) + ti = dr.get_task_instances()[0] + assert ti.xcom_pull() == expected_xcom_return + + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") + def test_decorator_with_file_pattern_with_args(self, sftp_hook_mock, dag_maker): + sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" + file_path_list = ["/path/to/file/text_file.txt", "/path/to/file/another_text_file.txt"] + op_args = ["op_args_1"] + op_kwargs = {"key": "value"} + sftp_hook_mock.return_value.get_files_by_pattern.return_value = [ + "text_file.txt", + "another_text_file.txt", + ] + decorated_func_return = {"args": op_args, "kwargs": {**op_kwargs, "files_found": file_path_list}} + expected_xcom_return = { + "files_found": file_path_list, + "decorator_return_value": decorated_func_return, + } + + @task.sftp_sensor(path="/path/to/file/", file_pattern=".txt") + def f(*args, **kwargs): + return {"args": args, "kwargs": kwargs} + + with dag_maker(): + ret = f(*op_args, **op_kwargs) + + dr = dag_maker.create_dagrun() + ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) + ti = dr.get_task_instances()[0] + assert ti.xcom_pull() == expected_xcom_return diff --git a/tests/providers/sftp/sensors/test_sftp.py b/tests/providers/sftp/sensors/test_sftp.py index 6895c158c8caa..fa4b6e35bf125 100644 --- a/tests/providers/sftp/sensors/test_sftp.py +++ b/tests/providers/sftp/sensors/test_sftp.py @@ -19,13 +19,14 @@ from datetime import datetime from unittest import mock -from unittest.mock import patch +from unittest.mock import Mock, call, patch import pytest from paramiko.sftp import SFTP_FAILURE, SFTP_NO_SUCH_FILE from pendulum import datetime as pendulum_datetime, timezone from airflow.providers.sftp.sensors.sftp import SFTPSensor +from airflow.sensors.base import PokeReturnValue class TestSFTPSensor: @@ -118,19 +119,6 @@ def test_file_not_present_with_pattern(self, sftp_hook_mock): output = sftp_sensor.poke(context) assert not output - @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") - def test_multiple_file_present_with_pattern(self, sftp_hook_mock): - sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" - sftp_hook_mock.return_value.get_files_by_pattern.return_value = [ - "text_file.txt", - "another_text_file.txt", - ] - sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/", file_pattern="*.txt") - context = {"ds": "1970-01-01"} - output = sftp_sensor.poke(context) - sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/text_file.txt") - assert output - @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") def test_multiple_files_present_with_pattern(self, sftp_hook_mock): sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" @@ -141,7 +129,9 @@ def test_multiple_files_present_with_pattern(self, sftp_hook_mock): sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/", file_pattern="*.txt") context = {"ds": "1970-01-01"} output = sftp_sensor.poke(context) - sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/text_file.txt") + get_mod_time = sftp_hook_mock.return_value.get_mod_time + expected_calls = [call("/path/to/file/text_file.txt"), call("/path/to/file/another_text_file.txt")] + assert get_mod_time.mock_calls == expected_calls assert output @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") @@ -199,3 +189,69 @@ def test_multiple_old_files_present_with_pattern_and_newer_than(self, sftp_hook_ ] ) assert not output + + @pytest.mark.parametrize( + "op_args, op_kwargs,", + [ + pytest.param(("op_arg_1",), {"key": "value"}), + pytest.param((), {}), + ], + ) + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") + def test_file_path_present_with_callback(self, sftp_hook_mock, op_args, op_kwargs): + sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" + sample_callable = Mock() + sample_callable.return_value = ["sample_return"] + sftp_sensor = SFTPSensor( + task_id="unit_test", + path="/path/to/file/1970-01-01.txt", + python_callable=sample_callable, + op_args=op_args, + op_kwargs=op_kwargs, + ) + context = {"ds": "1970-01-01"} + output = sftp_sensor.poke(context) + + sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt") + sample_callable.assert_called_once_with(*op_args, **op_kwargs) + assert isinstance(output, PokeReturnValue) + assert output.is_done + assert output.xcom_value == { + "files_found": ["/path/to/file/1970-01-01.txt"], + "decorator_return_value": ["sample_return"], + } + + @pytest.mark.parametrize( + "op_args, op_kwargs,", + [ + pytest.param(("op_arg_1",), {"key": "value"}), + pytest.param((), {}), + ], + ) + @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") + def test_file_pattern_present_with_callback(self, sftp_hook_mock, op_args, op_kwargs): + sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" + sample_callable = Mock() + sample_callable.return_value = ["sample_return"] + sftp_hook_mock.return_value.get_files_by_pattern.return_value = [ + "text_file.txt", + "another_text_file.txt", + ] + sftp_sensor = SFTPSensor( + task_id="unit_test", + path="/path/to/file/", + file_pattern=".txt", + python_callable=sample_callable, + op_args=op_args, + op_kwargs=op_kwargs, + ) + context = {"ds": "1970-01-01"} + output = sftp_sensor.poke(context) + + sample_callable.assert_called_once_with(*op_args, **op_kwargs) + assert isinstance(output, PokeReturnValue) + assert output.is_done + assert output.xcom_value == { + "files_found": ["/path/to/file/text_file.txt", "/path/to/file/another_text_file.txt"], + "decorator_return_value": ["sample_return"], + } diff --git a/tests/system/providers/sftp/__init__.py b/tests/system/providers/sftp/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/sftp/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/sftp/example_sftp_sensor.py b/tests/system/providers/sftp/example_sftp_sensor.py new file mode 100644 index 0000000000000..f3a6bc253c7fb --- /dev/null +++ b/tests/system/providers/sftp/example_sftp_sensor.py @@ -0,0 +1,89 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import os +from datetime import datetime + +from airflow.decorators import task +from airflow.models import DAG +from airflow.providers.sftp.sensors.sftp import SFTPSensor +from airflow.providers.ssh.operators.ssh import SSHOperator + +SFTP_DIRECTORY = os.environ.get("SFTP_DIRECTORY", "example-empty-directory").rstrip("/") + "/" +FULL_FILE_PATH = f"{SFTP_DIRECTORY}example_test_sftp_sensor_decory_file.txt" +SFTP_DEFAULT_CONNECTION = "sftp_default" + + +@task.python +def sleep_function(): + import time + + time.sleep(60) + + +with DAG( + "example_sftp_sensor", + schedule="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "sftp"], +) as dag: + + # [START howto_operator_sftp_sensor_decorator] + @task.sftp_sensor(task_id="sftp_sensor", path=FULL_FILE_PATH, poke_interval=10) + def sftp_sensor_decorator(): + print("Files were successfully found!") + # add your logic + return "done" + + # [END howto_operator_sftp_sensor_decorator] + + remove_file_task_start = SSHOperator( + task_id="remove_file_start", + command=f"rm {FULL_FILE_PATH} || true", + ssh_conn_id=SFTP_DEFAULT_CONNECTION, + ) + remove_file_task_end = SSHOperator( + task_id="remove_file_end", command=f"rm {FULL_FILE_PATH} || true", ssh_conn_id=SFTP_DEFAULT_CONNECTION + ) + create_decoy_file_task = SSHOperator( + task_id="create_file", command=f"touch {FULL_FILE_PATH}", ssh_conn_id=SFTP_DEFAULT_CONNECTION + ) + sleep_task = sleep_function() + sftp_with_sensor = sftp_sensor_decorator() + + # [START howto_operator_sftp_sensor] + sftp_with_operator = SFTPSensor(task_id="sftp_operator", path=FULL_FILE_PATH, poke_interval=10) + # [END howto_operator_sftp_sensor] + + remove_file_task_start >> sleep_task >> create_decoy_file_task + remove_file_task_start >> [sftp_with_operator, sftp_with_sensor] >> remove_file_task_end + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)