Skip to content

Commit

Permalink
Add muldelete action to TaskInstanceModelView and tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-astronomer committed Sep 30, 2021
1 parent 414f41e commit 80841e7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 2 deletions.
10 changes: 9 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3896,7 +3896,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
class_permission_name = permissions.RESOURCE_DAG_RUN
method_permission_name = {
'list': 'read',
'action_clear': 'delete',
'action_clear': 'edit',
'action_muldelete': 'delete',
'action_set_running': 'edit',
'action_set_failed': 'edit',
Expand Down Expand Up @@ -4204,6 +4204,7 @@ class TaskInstanceModelView(AirflowPrivilegeVerifierModelView):
method_permission_name = {
'list': 'read',
'action_clear': 'edit',
'action_muldelete': 'delete',
'action_set_running': 'edit',
'action_set_failed': 'edit',
'action_set_success': 'edit',
Expand Down Expand Up @@ -4338,6 +4339,13 @@ def action_clear(self, task_instances, session=None):
self.update_redirect()
return redirect(self.get_redirect())

@action('muldelete', 'Delete', "Are you sure you want to delete selected records?", single=False)
@action_has_dag_edit_access
def action_muldelete(self, items):
self.datamodel.delete_all(items)
self.update_redirect()
return redirect(self.get_redirect())

@provide_session
def set_task_instance_state(self, tis, target_state, session=None):
"""Set task instance state."""
Expand Down
59 changes: 58 additions & 1 deletion tests/www/views/test_views_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from airflow import settings
from airflow.executors.celery_executor import CeleryExecutor
from airflow.models import DagBag, DagModel, TaskInstance
from airflow.models import DagBag, DagModel, TaskInstance, TaskReschedule
from airflow.models.dagcode import DagCode
from airflow.security import permissions
from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES
Expand Down Expand Up @@ -754,3 +754,60 @@ def test_set_task_instance_action_permission_denied(session, client_ti_without_d
follow_redirects=True,
)
check_content_in_response(expected_message, resp)


@pytest.mark.parametrize(
"task_search_tuples",
[
[("example_xcom", "bash_push"), ("example_bash_operator", "run_this_last")],
[("example_subdag_operator", "some-other-task")],
],
ids=['multiple_tasks', 'one_task'],
)
def test_action_muldelete_task_instance(session, admin_client, task_search_tuples):
# get task instances to delete
tasks_to_delete = []
for task_search_tuple in task_search_tuples:
dag_id, task_id = task_search_tuple
tasks_to_delete.append(
session.query(TaskInstance)
.filter(TaskInstance.task_id == task_id, TaskInstance.dag_id == dag_id)
.one()
)

# add task reschedules for those tasks to make sure that the delete cascades to the required tables
trs = [
TaskReschedule(
task=task,
run_id=task.run_id,
try_number=1,
start_date=timezone.datetime(2021, 1, 1),
end_date=timezone.datetime(2021, 1, 2),
reschedule_date=timezone.datetime(2021, 1, 3),
)
for task in tasks_to_delete
]
session.bulk_save_objects(trs)
session.flush()

# run the function to test
resp = admin_client.post(
"/taskinstance/action_post",
data={
"action": "muldelete",
"rowid": [_get_appbuilder_pk_string(TaskInstanceModelView, task) for task in tasks_to_delete],
},
follow_redirects=True,
)

# assert expected behavior for that function and its response
assert resp.status_code == 200
for task_search_tuple in task_search_tuples:
dag_id, task_id = task_search_tuple
assert (
session.query(TaskInstance)
.filter(TaskInstance.task_id == task_id, TaskInstance.dag_id == dag_id)
.count()
== 0
)
assert session.query(TaskReschedule).count() == 0

0 comments on commit 80841e7

Please sign in to comment.