Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ repos:
^providers/apache/iceberg/.*\.py$|
^providers/apache/kafka/.*\.py$|
^providers/arangodb/.*\.py$|
^providers/asana/.*\.py$|
^providers/cloudant/.*\.py$|
^providers/cohere/.*\.py$|
^providers/common/compat/.*\.py$|
Expand Down
72 changes: 33 additions & 39 deletions providers/asana/tests/unit/asana/operators/test_asana_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,17 @@
# under the License.
from __future__ import annotations

from datetime import timedelta
from unittest.mock import MagicMock, patch

import pytest

from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.providers.asana.operators.asana_tasks import (
AsanaCreateTaskOperator,
AsanaDeleteTaskOperator,
AsanaFindTaskOperator,
AsanaUpdateTaskOperator,
)
from airflow.utils import timezone

# The tests do not create dag runs, so db isolation tests are skipped

DEFAULT_DATE = timezone.datetime(2015, 1, 1)
TEST_DAG_ID = "unit_test_dag"
asana_tasks_api_mock = MagicMock(name="asana_tasks_api_for_test")


class TestAsanaTaskOperators:
Expand All @@ -45,72 +36,75 @@ class TestAsanaTaskOperators:

@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
args = {"owner": "airflow", "start_date": DEFAULT_DATE}
dag = DAG(TEST_DAG_ID, schedule=timedelta(days=1), default_args=args)
self.dag = dag
create_connection_without_db(Connection(conn_id="asana_test", conn_type="asana", password="test"))

@pytest.mark.db_test
@patch("airflow.providers.asana.hooks.asana.TasksApi", autospec=True, return_value=asana_tasks_api_mock)
def test_asana_create_task_operator(self, mock_tasks_api):
@patch("airflow.providers.asana.operators.asana_tasks.AsanaHook")
def test_asana_create_task_operator(self, mock_asana_hook):
"""
Tests that the AsanaCreateTaskOperator makes the expected call to python-asana given valid arguments.
"""

mock_tasks_api.return_value.create_task.return_value = {"gid": "1"}
mock_hook_instance = MagicMock()
mock_asana_hook.return_value = mock_hook_instance
mock_hook_instance.create_task.return_value = {"gid": "1"}

create_task = AsanaCreateTaskOperator(
task_id="create_task",
conn_id="asana_test",
name="test",
task_parameters={"workspace": "1"},
dag=self.dag,
)
create_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert mock_tasks_api.return_value.create_task.called
result = create_task.execute({})
mock_hook_instance.create_task.assert_called_once_with("test", {"workspace": "1"})
assert result == "1"

@pytest.mark.db_test
@patch("airflow.providers.asana.hooks.asana.TasksApi", autospec=True, return_value=asana_tasks_api_mock)
def test_asana_find_task_operator(self, mock_tasks_api):
@patch("airflow.providers.asana.operators.asana_tasks.AsanaHook")
def test_asana_find_task_operator(self, mock_asana_hook):
"""
Tests that the AsanaFindTaskOperator makes the expected call to python-asana given valid arguments.
"""
mock_tasks_api.return_value.tasks.create.return_value = {"gid": "1"}
find_task = AsanaFindTaskOperator(

mock_hook_instance = MagicMock()
mock_asana_hook.return_value = mock_hook_instance
mock_hook_instance.find_task.return_value = {"gid": "1"}

task = AsanaFindTaskOperator(
task_id="find_task",
conn_id="asana_test",
search_parameters={"project": "test"},
dag=self.dag,
)
find_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert mock_tasks_api.return_value.get_tasks.called
result = task.execute({})
mock_hook_instance.find_task.assert_called_once_with({"project": "test"})
assert result == {"gid": "1"}

@pytest.mark.db_test
@patch("airflow.providers.asana.hooks.asana.TasksApi", autospec=True, return_value=asana_tasks_api_mock)
def test_asana_update_task_operator(self, mock_tasks_api):
@patch("airflow.providers.asana.operators.asana_tasks.AsanaHook")
def test_asana_update_task_operator(self, mock_asana_hook):
"""
Tests that the AsanaUpdateTaskOperator makes the expected call to python-asana given valid arguments.
"""
mock_hook_instance = MagicMock()
mock_asana_hook.return_value = mock_hook_instance
update_task = AsanaUpdateTaskOperator(
task_id="update_task",
conn_id="asana_test",
asana_task_gid="test",
task_parameters={"completed": True},
dag=self.dag,
)
update_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert mock_tasks_api.return_value.update_task.called
update_task.execute({})
mock_hook_instance.update_task.assert_called_once_with("test", {"completed": True})

@pytest.mark.db_test
@patch("airflow.providers.asana.hooks.asana.TasksApi", autospec=True, return_value=asana_tasks_api_mock)
def test_asana_delete_task_operator(self, mock_tasks_api):
@patch("airflow.providers.asana.operators.asana_tasks.AsanaHook")
def test_asana_delete_task_operator(self, mock_asana_hook):
"""
Tests that the AsanaDeleteTaskOperator makes the expected call to python-asana given valid arguments.
"""
mock_hook_instance = MagicMock()
mock_asana_hook.return_value = mock_hook_instance

delete_task = AsanaDeleteTaskOperator(
task_id="delete_task",
conn_id="asana_test",
asana_task_gid="test",
dag=self.dag,
)
delete_task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert mock_tasks_api.return_value.delete_task.called
delete_task.execute({})
mock_hook_instance.delete_task.assert_called_once_with("test")
Loading