From f25e550314112a110cdc8143e652aa0d2c313387 Mon Sep 17 00:00:00 2001 From: Regis Sinjari Date: Mon, 11 Mar 2024 15:31:36 +0100 Subject: [PATCH] [Fixes #11995] Implement the DELETE method for the User API (#12028) * [Fixes #11995] Implement the DELETE method for the User API * [Fixes #11995] Implement the DELETE method for the User API refactor and docstrings added --- geonode/base/api/tests.py | 14 +++-- geonode/people/tests.py | 125 ++++++++++++++++++++++++++++++++++++++ geonode/people/utils.py | 56 +++++++++++++++++ geonode/people/views.py | 12 +++- geonode/settings.py | 9 +++ 5 files changed, 211 insertions(+), 5 deletions(-) diff --git a/geonode/base/api/tests.py b/geonode/base/api/tests.py index 9f32e5e6c56..88970a91bbc 100644 --- a/geonode/base/api/tests.py +++ b/geonode/base/api/tests.py @@ -464,15 +464,21 @@ def test_delete_user_profile(self): # Bob can't delete user self.assertTrue(self.client.login(username="bobby", password="bob")) response = self.client.delete(url, format="json") - self.assertEqual(response.status_code, 405) - # User can not delete self profile + self.assertEqual(response.status_code, 403) + # User can delete self profile self.assertTrue(self.client.login(username="user_test_delete", password="user")) response = self.client.delete(url, format="json") - self.assertEqual(response.status_code, 405) + self.assertEqual(response.status_code, 200) + self.assertEqual(get_user_model().objects.filter(username="user_test_delete").first(), None) + # recreate user that was deleted + user = get_user_model().objects.create_user( + username="user_test_delete", email="user_test_delete@geonode.org", password="user" + ) + url = reverse("users-detail", kwargs={"pk": user.pk}) # Admin can delete user self.assertTrue(self.client.login(username="admin", password="admin")) response = self.client.delete(url, format="json") - self.assertEqual(response.status_code, 405) + self.assertEqual(response.status_code, 200) finally: user.delete() diff --git a/geonode/people/tests.py b/geonode/people/tests.py index b306bbea220..dc89fd3c3e1 100644 --- a/geonode/people/tests.py +++ b/geonode/people/tests.py @@ -19,6 +19,8 @@ import django from django.test.utils import override_settings from mock import MagicMock, PropertyMock, patch +from geonode.base.models import ResourceBase +from geonode.groups.models import GroupMember, GroupProfile from geonode.tests.base import GeoNodeBaseTestSupport from django.core import mail @@ -58,6 +60,7 @@ def setUp(self): self.permission_type = ("view", "download", "edit") self.groups = Group.objects.all()[:3] self.group_ids = ",".join(str(element.pk) for element in self.groups) + self.bar = GroupProfile.objects.get(slug="bar") def test_redirect_on_get_request(self): """ @@ -786,3 +789,125 @@ def test_users_api_patch_username(self): # username cannot be updated self.assertEqual(response.status_code, 400) self.assertTrue("username cannot be updated" in response.json()["errors"]) + + @override_settings( + USER_DELETION_RULES=["geonode.people.utils.user_has_resources", "geonode.people.utils.user_is_manager"] + ) + def test_valid_delete(self): + # create a new user + tim = get_user_model().objects.create(username="tim") + + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete tim + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + # check that tim is not manager + # nor has any resources + self.assertFalse(ResourceBase.objects.filter(owner_id=tim.pk).exists()) + self.assertFalse(GroupMember.objects.filter(user_id=tim.pk, role="manager").exists()) + + url = f"{reverse('users-list')}/{tim.pk}" + response = self.client.delete(url, content_type="application/json") + + # admin is permitted to delete + self.assertEqual(response.status_code, 200) + # tim has been deleted + self.assertEqual(get_user_model().objects.filter(username="tim").first(), None) + + @override_settings(USER_DELETION_RULES=[]) + @patch("geonode.people.utils.user_deletion_modules", []) + def test_delete_without_validators(self): + + norman = get_user_model().objects.get(username="norman") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete norman but norman is already promoted + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + + # Make sure norman is not a member + self.assertFalse(self.bar.user_is_member(norman)) + + # Add norman to the self.bar group + self.bar.join(norman) + + # Ensure norman is now a member + self.assertTrue(self.bar.user_is_member(norman)) + + # promote norman to a manager + self.bar.promote(norman) + # Ensure norman is in the managers queryset + self.assertTrue(norman in self.bar.get_managers()) + + url = f"{reverse('users-list')}/{norman.pk}" + response = self.client.delete(url, content_type="application/json") + + # norman can be deleted because validator rules are not applied + self.assertEqual(response.status_code, 200) + self.assertEqual(get_user_model().objects.filter(username="norman").first(), None) + + @override_settings( + USER_DELETION_RULES=["geonode.people.utils.user_has_resources", "geonode.people.utils.user_is_manager"] + ) + def test_delete_a_manger(self): + norman = get_user_model().objects.get(username="norman") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete norman but norman is already promoted + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + + # Make sure norman is not a member + self.assertFalse(self.bar.user_is_member(norman)) + + # Add norman to the self.bar group + self.bar.join(norman) + + # Ensure norman is now a member + self.assertTrue(self.bar.user_is_member(norman)) + + # promote norman to a manager + self.bar.promote(norman) + # Ensure norman is in the managers queryset + self.assertTrue(norman in self.bar.get_managers()) + + url = f"{reverse('users-list')}/{norman.pk}" + response = self.client.delete(url, content_type="application/json") + + # norman cant be deleted + self.assertEqual(response.status_code, 403) + self.assertNotEqual(get_user_model().objects.filter(username="norman").first(), None) + # + self.assertTrue("user_is_manager" in response.json()["errors"][0]) + + @override_settings(USER_DELETION_RULES=["geonode.people.utils.user_has_resources"]) + def test_delete_a_user_with_resource(self): + # create a new user + bobby = get_user_model().objects.get(username="bobby") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="admin", password="admin")) + + # admin wants to delete bobby + # Admin is superuser or staff + self.assertTrue(admin.is_superuser or admin.is_staff) + # check that bobby is not manager + # but he has resources already assigned + self.assertTrue(ResourceBase.objects.filter(owner_id=bobby.pk).exists()) + self.assertFalse(GroupMember.objects.filter(user_id=bobby.pk, role="manager").exists()) + + url = f"{reverse('users-list')}/{bobby.pk}" + response = self.client.delete(url, content_type="application/json") + + # admin is permitted to delete + self.assertEqual(response.status_code, 403) + # bobby cant be deleted + self.assertNotEqual(get_user_model().objects.filter(username="bobby").first(), None) + self.assertTrue("user_has_resources" in response.json()["errors"][0]) diff --git a/geonode/people/utils.py b/geonode/people/utils.py index 99ec757f220..9df70ff0de6 100644 --- a/geonode/people/utils.py +++ b/geonode/people/utils.py @@ -22,8 +22,14 @@ from django.contrib.auth.models import Group from geonode import GeoNodeException +from geonode.base.models import ResourceBase from geonode.groups.models import GroupProfile, GroupMember from geonode.groups.conf import settings as groups_settings +from rest_framework.exceptions import PermissionDenied +from django.conf import settings +from django.utils.module_loading import import_string + +# from geonode.people.models import Profile def get_default_user(): @@ -140,3 +146,53 @@ def get_available_users(user): member_ids.extend(users_ids) return get_user_model().objects.filter(id__in=member_ids) + + +def user_has_resources(profile) -> bool: + """ + checks if user has any resource in ownership + + Args: + profile (Profile) : accepts a userprofile instance. + + Returns: + bool: profile is the owner of any resources + """ + return ResourceBase.objects.filter(owner_id=profile.pk).exists() + + +def user_is_manager(profile) -> bool: + """ + Checks if user is the manager of any group + + Args: + profile (Profile) : accepts a userprofile instance. + + Returns: + bool: profile is mangager or not + + """ + return GroupMember.objects.filter(user_id=profile.pk, role=GroupMember.MANAGER).exists() + + +def call_user_deletion_rules(profile) -> None: + """ + calls a set of defined rules specific to the deletion of a user + which are read from settings.USER_DELETION_RULES + new rules can be added as long as they take as parameter the userprofile + and return a boolean + Args: + profile (Profile) : accepts a userprofile instance. + + Returns: + None : Raises PermissionDenied Exception in case any of the deletion rule Fail + """ + if not globals().get("user_deletion_modules"): + rule_path = settings.USER_DELETION_RULES if hasattr(settings, "USER_DELETION_RULES") else [] + globals()["user_deletion_modules"] = [import_string(deletion_rule) for deletion_rule in rule_path] + error_list = [] + for not_deletable in globals().get("user_deletion_modules", []): + if not_deletable(profile): + error_list.append(not_deletable.__name__) + if error_list: + raise PermissionDenied(f"Deletion rule Violated: {', '.join(error_list)}") diff --git a/geonode/people/views.py b/geonode/people/views.py index e62dbdd1327..184f6a28b60 100644 --- a/geonode/people/views.py +++ b/geonode/people/views.py @@ -54,6 +54,7 @@ from geonode.security.utils import get_visible_resources from guardian.shortcuts import get_objects_for_user from rest_framework.exceptions import PermissionDenied +from geonode.people.utils import call_user_deletion_rules class SetUserLayerPermission(View): @@ -166,7 +167,7 @@ class UserViewSet(DynamicModelViewSet): API endpoint that allows users to be viewed or edited. """ - http_method_names = ["get", "post", "patch"] + http_method_names = ["get", "post", "patch", "delete"] authentication_classes = [SessionAuthentication, BasicAuthentication, OAuth2Authentication] permission_classes = [ IsAuthenticated, @@ -204,6 +205,15 @@ def update(self, request, *args, **kwargs): serializer.save() return Response(serializer.data) + def destroy(self, request, *args, **kwargs): + instance = self.get_object() + self.perform_destroy(instance) + return Response("User deleted sucessfully", status=200) + + def perform_destroy(self, instance): + call_user_deletion_rules(instance) + instance.delete() + @extend_schema( methods=["get"], responses={200: ResourceBaseSerializer(many=True)}, diff --git a/geonode/settings.py b/geonode/settings.py index bd597c2ac93..b0bbe92c19e 100644 --- a/geonode/settings.py +++ b/geonode/settings.py @@ -2220,6 +2220,15 @@ def get_geonode_catalogue_service(): ] +""" +List of modules that implement the deletion rules for a user +""" +USER_DELETION_RULES = [ + "geonode.people.utils.user_has_resources" + # ,"geonode.people.utils.user_is_manager" +] + + """ Define the URLs patterns used by the SizeRestrictedFileUploadHandler to evaluate if the file is greater than the limit size defined