Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecations from SFTP Provider #44740

Merged
merged 6 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions providers/src/airflow/providers/sftp/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@
Changelog
---------

main
....

.. warning::
All deprecated classes, parameters and features have been removed from the sftp provider package.
The following breaking changes were introduced:

* Removed deprecated ``ssh_hook`` parameter from ``SFTPOperator``. Use ``sftp_hook`` instead.
* Removed deprecated ``ssh_hook`` parameter from ``SFTPHook``.
* Removed deprecated ``ftp_conn_id`` parameter from ``SFTPHook``. Use ``ssh_conn_id`` instead.




4.11.1
......

Expand Down
37 changes: 2 additions & 35 deletions providers/src/airflow/providers/sftp/hooks/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import datetime
import os
import stat
import warnings
from collections.abc import Sequence
from fnmatch import fnmatch
from typing import TYPE_CHECKING, Any, Callable

import asyncssh
from asgiref.sync import sync_to_async

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.providers.ssh.hooks.ssh import SSHHook

Expand Down Expand Up @@ -62,7 +61,6 @@ class SFTPHook(SSHHook):
For consistency reasons with SSHHook, the preferred parameter is "ssh_conn_id".

:param ssh_conn_id: The :ref:`sftp connection id<howto/connection:sftp>`
:param ssh_hook: Optional SSH hook (included to support passing of an SSH hook to the SFTP operator)
"""

conn_name_attr = "ssh_conn_id"
Expand All @@ -82,39 +80,12 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]:
def __init__(
self,
ssh_conn_id: str | None = "sftp_default",
ssh_hook: SSHHook | None = None,
host_proxy_cmd: str | None = None,
*args,
**kwargs,
) -> None:
self.conn: paramiko.SFTPClient | None = None

# TODO: remove support for ssh_hook when it is removed from SFTPOperator
self.ssh_hook = ssh_hook

if self.ssh_hook is not None:
warnings.warn(
"Parameter `ssh_hook` is deprecated and will be removed in a future version.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
if not isinstance(self.ssh_hook, SSHHook):
raise AirflowException(
f"ssh_hook must be an instance of SSHHook, but got {type(self.ssh_hook)}"
)
self.log.info("ssh_hook is provided. It will be used to generate SFTP connection.")
self.ssh_conn_id = self.ssh_hook.ssh_conn_id
return

ftp_conn_id = kwargs.pop("ftp_conn_id", None)
if ftp_conn_id:
warnings.warn(
"Parameter `ftp_conn_id` is deprecated. Please use `ssh_conn_id` instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
ssh_conn_id = ftp_conn_id

kwargs["ssh_conn_id"] = ssh_conn_id
kwargs["host_proxy_cmd"] = host_proxy_cmd
self.ssh_conn_id = ssh_conn_id
Expand All @@ -124,11 +95,7 @@ def __init__(
def get_conn(self) -> paramiko.SFTPClient: # type: ignore[override]
"""Open an SFTP connection to the remote host."""
if self.conn is None:
# TODO: remove support for ssh_hook when it is removed from SFTPOperator
if self.ssh_hook is not None:
self.conn = self.ssh_hook.get_conn().open_sftp()
else:
self.conn = super().get_conn().open_sftp()
self.conn = super().get_conn().open_sftp()
return self.conn

def close_conn(self) -> None:
Expand Down
43 changes: 6 additions & 37 deletions providers/src/airflow/providers/sftp/operators/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@

import os
import socket
import warnings
from collections.abc import Sequence
from pathlib import Path
from typing import Any

import paramiko

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.ssh.hooks.ssh import SSHHook


class SFTPOperation:
Expand All @@ -48,15 +46,12 @@ class SFTPOperator(BaseOperator):
This operator uses sftp_hook to open sftp transport channel that serve as basis for file transfer.

:param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
or `sftp_hook` is provided.
from airflow Connections.
:param sftp_hook: predefined SFTPHook to use
Either `sftp_hook` or `ssh_conn_id` needs to be provided.
:param ssh_hook: Deprecated - predefined SSHHook to use for remote execution
Use `sftp_hook` instead.
:param remote_host: remote host to connect (templated)
Nullable. If provided, it will replace the `remote_host` which was
defined in `sftp_hook`/`ssh_hook` or predefined in the connection of `ssh_conn_id`.
defined in `sftp_hook` or predefined in the connection of `ssh_conn_id`.
:param local_filepath: local file path or list of local file paths to get or put. (templated)
:param remote_filepath: remote file path or list of remote file paths to get or put. (templated)
:param operation: specify operation 'get' or 'put', defaults to put
Expand Down Expand Up @@ -86,7 +81,6 @@ class SFTPOperator(BaseOperator):
def __init__(
self,
*,
ssh_hook: SSHHook | None = None,
sftp_hook: SFTPHook | None = None,
ssh_conn_id: str | None = None,
remote_host: str | None = None,
Expand All @@ -98,7 +92,6 @@ def __init__(
**kwargs,
) -> None:
super().__init__(**kwargs)
self.ssh_hook = ssh_hook
self.sftp_hook = sftp_hook
self.ssh_conn_id = ssh_conn_id
self.remote_host = remote_host
Expand Down Expand Up @@ -131,35 +124,13 @@ def execute(self, context: Any) -> str | list[str] | None:
f"expected {SFTPOperation.GET} or {SFTPOperation.PUT}."
)

# TODO: remove support for ssh_hook in next major provider version in hook and operator
if self.ssh_hook is not None and self.sftp_hook is not None:
raise AirflowException(
"Both `ssh_hook` and `sftp_hook` are defined. Please use only one of them."
)

if self.ssh_hook is not None:
if not isinstance(self.ssh_hook, SSHHook):
self.log.info("ssh_hook is invalid. Trying ssh_conn_id to create SFTPHook.")
self.sftp_hook = SFTPHook(ssh_conn_id=self.ssh_conn_id)
if self.sftp_hook is None:
warnings.warn(
"Parameter `ssh_hook` is deprecated. "
"Please use `sftp_hook` instead. "
"The old parameter `ssh_hook` will be removed in a future version.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self.sftp_hook = SFTPHook(ssh_hook=self.ssh_hook)

file_msg = None
try:
if self.ssh_conn_id:
if self.sftp_hook and isinstance(self.sftp_hook, SFTPHook):
self.log.info("ssh_conn_id is ignored when sftp_hook/ssh_hook is provided.")
self.log.info("ssh_conn_id is ignored when sftp_hook is provided.")
else:
self.log.info(
"sftp_hook/ssh_hook not provided or invalid. Trying ssh_conn_id to create SFTPHook."
)
self.log.info("sftp_hook not provided or invalid. Trying ssh_conn_id to create SFTPHook.")
self.sftp_hook = SFTPHook(ssh_conn_id=self.ssh_conn_id)

if not self.sftp_hook:
Expand Down Expand Up @@ -217,7 +188,7 @@ def get_openlineage_facets_on_start(self):
exc_info=True,
)

hook = self.sftp_hook or self.ssh_hook or SFTPHook(ssh_conn_id=self.ssh_conn_id)
hook = self.sftp_hook or SFTPHook(ssh_conn_id=self.ssh_conn_id)

if self.remote_host is not None:
remote_host = self.remote_host
Expand All @@ -235,8 +206,6 @@ def get_openlineage_facets_on_start(self):

if hasattr(hook, "port"):
remote_port = hook.port
elif hasattr(hook, "ssh_hook"):
remote_port = hook.ssh_hook.port

# Since v4.1.0, SFTPOperator accepts both a string (single file) and a list of
# strings (multiple files) as local_filepath and remote_filepath, and internally
Expand Down
41 changes: 1 addition & 40 deletions providers/tests/sftp/hooks/test_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
from asyncssh import SFTPAttrs, SFTPNoSuchFile
from asyncssh.sftp import SFTPName

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.sftp.hooks.sftp import SFTPHook, SFTPHookAsync
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.utils.session import provide_session

pytestmark = pytest.mark.db_test
Expand Down Expand Up @@ -389,44 +388,6 @@ def test_connection_success(self, mock_get_connection):
assert status is True
assert msg == "Connection successfully tested"

@mock.patch("airflow.providers.sftp.hooks.sftp.SFTPHook.get_connection")
def test_deprecation_ftp_conn_id(self, mock_get_connection):
connection = Connection(conn_id="ftp_default", login="login", host="host")
mock_get_connection.return_value = connection
# If `ftp_conn_id` is provided, it will be used but would show a deprecation warning.
with pytest.warns(AirflowProviderDeprecationWarning, match=r"Parameter `ftp_conn_id` is deprecated"):
assert SFTPHook(ftp_conn_id="ftp_default").ssh_conn_id == "ftp_default"

# If both are provided, ftp_conn_id will be used but would show a deprecation warning.
with pytest.warns(AirflowProviderDeprecationWarning, match=r"Parameter `ftp_conn_id` is deprecated"):
assert (
SFTPHook(ftp_conn_id="ftp_default", ssh_conn_id="sftp_default").ssh_conn_id == "ftp_default"
)

# If `ssh_conn_id` is provided, it should use it for ssh_conn_id
assert SFTPHook(ssh_conn_id="sftp_default").ssh_conn_id == "sftp_default"
# Default is 'sftp_default
assert SFTPHook().ssh_conn_id == "sftp_default"

@mock.patch("airflow.providers.sftp.hooks.sftp.SFTPHook.get_connection")
def test_invalid_ssh_hook(self, mock_get_connection):
connection = Connection(conn_id="sftp_default", login="root", host="localhost")
mock_get_connection.return_value = connection
with (
pytest.raises(AirflowException, match="ssh_hook must be an instance of SSHHook"),
pytest.warns(AirflowProviderDeprecationWarning, match=r"Parameter `ssh_hook` is deprecated.*"),
):
SFTPHook(ssh_hook="invalid_hook")

@mock.patch("airflow.providers.ssh.hooks.ssh.SSHHook.get_connection")
def test_valid_ssh_hook(self, mock_get_connection):
connection = Connection(conn_id="sftp_test", login="root", host="localhost")
mock_get_connection.return_value = connection
with pytest.warns(AirflowProviderDeprecationWarning, match=r"Parameter `ssh_hook` is deprecated.*"):
hook = SFTPHook(ssh_hook=SSHHook(ssh_conn_id="sftp_test"))
assert hook.ssh_conn_id == "sftp_test"
assert isinstance(hook.get_conn(), paramiko.SFTPClient)

def test_get_suffix_pattern_match(self):
output = self.hook.get_file_by_pattern(self.temp_dir, "*.txt")
# In CI files might have different name, so we check that file found rather than actual name
Expand Down
Loading