Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions providers/imap/src/airflow/providers/imap/hooks/imap.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,27 @@ def _build_client(self, conn: Connection) -> imaplib.IMAP4_SSL | imaplib.IMAP4:
return mail_client

def has_mail_attachment(
self, name: str, *, check_regex: bool = False, mail_folder: str = "INBOX", mail_filter: str = "All"
self,
name: str,
*,
check_regex: bool = False,
max_mails: int | None = None,
mail_folder: str = "INBOX",
mail_filter: str = "All",
) -> bool:
"""
Check the mail folder for mails containing attachments with the given name.

:param name: The name of the attachment that will be searched for.
:param check_regex: Checks the name for a regular expression.
:param max_mails: Maximum number of latest emails to process. Must be a positive integer, or None when unlimited. Defaults to None.
:param mail_folder: The mail folder where to look at.
:param mail_filter: If set other than 'All' only specific mails will be checked.
See :py:meth:`imaplib.IMAP4.search` for details.
:returns: True if there is an attachment with the given name and False if not.
"""
mail_attachments = self._retrieve_mails_attachments_by_name(
name, check_regex, True, mail_folder, mail_filter
name, check_regex, True, max_mails, mail_folder, mail_filter
)
return bool(mail_attachments)

Expand All @@ -143,6 +150,7 @@ def retrieve_mail_attachments(
*,
check_regex: bool = False,
latest_only: bool = False,
max_mails: int | None = None,
mail_folder: str = "INBOX",
mail_filter: str = "All",
not_found_mode: str = "raise",
Expand All @@ -153,6 +161,7 @@ def retrieve_mail_attachments(
:param name: The name of the attachment that will be downloaded.
:param check_regex: Checks the name for a regular expression.
:param latest_only: If set to True it will only retrieve the first matched attachment.
:param max_mails: Maximum number of latest emails to process. Must be a positive integer, or None when unlimited. Defaults to None.
:param mail_folder: The mail folder where to look at.
:param mail_filter: If set other than 'All' only specific mails will be checked.
See :py:meth:`imaplib.IMAP4.search` for details.
Expand All @@ -164,7 +173,7 @@ def retrieve_mail_attachments(
:returns: a list of tuple each containing the attachment filename and its payload.
"""
mail_attachments = self._retrieve_mails_attachments_by_name(
name, check_regex, latest_only, mail_folder, mail_filter
name, check_regex, latest_only, max_mails, mail_folder, mail_filter
)

if not mail_attachments:
Expand All @@ -179,6 +188,7 @@ def download_mail_attachments(
*,
check_regex: bool = False,
latest_only: bool = False,
max_mails: int | None = None,
mail_folder: str = "INBOX",
mail_filter: str = "All",
not_found_mode: str = "raise",
Expand All @@ -191,6 +201,7 @@ def download_mail_attachments(
where the files will be downloaded to.
:param check_regex: Checks the name for a regular expression.
:param latest_only: If set to True it will only download the first matched attachment.
:param max_mails: Maximum number of latest emails to process. Must be a positive integer, or None when unlimited. Defaults to None.
:param mail_folder: The mail folder where to look at.
:param mail_filter: If set other than 'All' only specific mails will be checked.
See :py:meth:`imaplib.IMAP4.search` for details.
Expand All @@ -201,7 +212,7 @@ def download_mail_attachments(
if set to 'ignore' it won't notify you at all.
"""
mail_attachments = self._retrieve_mails_attachments_by_name(
name, check_regex, latest_only, mail_folder, mail_filter
name, check_regex, latest_only, max_mails, mail_folder, mail_filter
)

if not mail_attachments:
Expand All @@ -218,16 +229,25 @@ def _handle_not_found_mode(self, not_found_mode: str) -> None:
self.log.warning("No mail attachments found!")

def _retrieve_mails_attachments_by_name(
self, name: str, check_regex: bool, latest_only: bool, mail_folder: str, mail_filter: str
) -> list:
self,
name: str,
check_regex: bool,
latest_only: bool,
max_mails: int | None,
mail_folder: str,
mail_filter: str,
) -> list[tuple]:
if not self.mail_client:
raise RuntimeError("The 'mail_client' should be initialized before!")

all_matching_attachments = []
if max_mails is not None and max_mails <= 0:
raise ValueError("max_mails must be a positive integer")

all_matching_attachments: list[tuple] = []

self.mail_client.select(mail_folder)

for mail_id in self._list_mail_ids_desc(mail_filter):
for mail_id in self._list_mail_ids_desc(mail_filter, max_mails=max_mails):
response_mail_body = self._fetch_mail_body(mail_id)
matching_attachments = self._check_mail_body(response_mail_body, name, check_regex, latest_only)

Expand All @@ -240,11 +260,14 @@ def _retrieve_mails_attachments_by_name(

return all_matching_attachments

def _list_mail_ids_desc(self, mail_filter: str) -> Iterable[str]:
def _list_mail_ids_desc(self, mail_filter: str, max_mails: int | None = None) -> Iterable[str]:
if not self.mail_client:
raise RuntimeError("The 'mail_client' should be initialized before!")
_, data = self.mail_client.search(None, mail_filter)
mail_ids = data[0].split()
if max_mails is not None:
mail_ids = mail_ids[-max_mails:]

return reversed(mail_ids)

def _fetch_mail_body(self, mail_id: str) -> str:
Expand Down
51 changes: 51 additions & 0 deletions providers/imap/tests/unit/imap/hooks/test_imap.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,54 @@ def test_download_mail_attachments_with_mail_filter(self, mock_imaplib, mock_ope

mock_imaplib.IMAP4_SSL.return_value.search.assert_called_once_with(None, mail_filter)
assert mock_open_method.call_count == 1

@patch(imaplib_string)
def test_retrieve_mail_attachments_with_max_mails(self, mock_imaplib):
mock_conn = _create_fake_imap(mock_imaplib, with_mail=True)
mock_conn.search.return_value = ("OK", [b"1 2 3"])

with ImapHook() as imap_hook:
attachments = imap_hook.retrieve_mail_attachments(
name="test1.csv",
max_mails=1,
)

assert attachments == [("test1.csv", b"SWQsTmFtZQoxLEZlbGl4")]
mock_conn.fetch.assert_called_once()

@patch(imaplib_string)
def test_retrieve_mail_attachments_with_max_mails_zero(self, mock_imaplib):
_create_fake_imap(mock_imaplib, with_mail=True)

with ImapHook() as imap_hook:
with pytest.raises(ValueError, match="max_mails must be a positive integer"):
imap_hook.retrieve_mail_attachments(
name="test1.csv",
max_mails=0,
)

@patch(imaplib_string)
def test_retrieve_mail_attachments_with_max_mails_negative(self, mock_imaplib):
_create_fake_imap(mock_imaplib, with_mail=True)

with ImapHook() as imap_hook:
with pytest.raises(ValueError, match="max_mails must be a positive integer"):
imap_hook.retrieve_mail_attachments(
name="test1.csv",
max_mails=-5,
)

@patch(imaplib_string)
def test_has_mail_attachment_with_max_mails(self, mock_imaplib):
mock_conn = _create_fake_imap(mock_imaplib, with_mail=True)

mock_conn.search.return_value = ("OK", [b"1 2 3 4"])

with ImapHook() as imap_hook:
result = imap_hook.has_mail_attachment(
name="test1.csv",
max_mails=2,
)

assert result is True
assert 1 <= mock_conn.fetch.call_count <= 2