diff --git a/geonode/people/tests.py b/geonode/people/tests.py index dc89fd3c3e1..7bc783864fc 100644 --- a/geonode/people/tests.py +++ b/geonode/people/tests.py @@ -36,6 +36,7 @@ from geonode.people import profileextractors from geonode.base.populate_test_data import all_public, create_models, remove_models +from django.db.models import Q class PeopleAndProfileTests(GeoNodeBaseTestSupport): @@ -911,3 +912,156 @@ def test_delete_a_user_with_resource(self): # bobby cant be deleted self.assertNotEqual(get_user_model().objects.filter(username="bobby").first(), None) self.assertTrue("user_has_resources" in response.json()["errors"][0]) + + def test_transfer_resources_all(self): + """ + user wants to transfer resources to target + """ + bobby = get_user_model().objects.get(username="bobby") + norman = get_user_model().objects.get(username="norman") + # group = GroupProfile.objects.get(slug="bar") + + self.assertTrue(self.client.login(username="bobby", password="bob")) + self.assertTrue(bobby.is_authenticated) + # check bobbys resources + bobby_resources = ResourceBase.objects.filter(owner=bobby) + prior_bobby_resources = bobby_resources.all() + self.assertTrue(bobby_resources.exists()) + # call api + response = self.client.post( + path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": norman.id} + ) + # check that bobby owns the resources no more + self.assertFalse(bobby_resources.exists()) + self.assertEqual(response.status_code, 200) + # check that the resources have been transvered to norman + norman_resources = ResourceBase.objects.filter(owner=norman).all() + self.assertTrue(set(prior_bobby_resources).issubset(set(norman_resources))) + + def test_transfer_resources_invalid_user(self): + """ + user wants to transfer resources to target + """ + bobby = get_user_model().objects.get(username="bobby") + invalid_user_id = get_user_model().objects.last().id + 1 + + self.assertTrue(self.client.login(username="bobby", password="bob")) + self.assertTrue(bobby.is_authenticated) + # check bobbys resources + bobby_resources = ResourceBase.objects.filter(owner=bobby) + prior_bobby_resources = bobby_resources.all() + self.assertTrue(bobby_resources.exists()) + # call api + response = self.client.post( + path=f"{reverse('users-list')}/{bobby}/transfer_resources", data={"owner": invalid_user_id} + ) + # response should be 404 + self.assertEqual(response.status_code, 404) + # check that bobby still owns the resources + later_bobby_resources = bobby_resources.all() + self.assertTrue(bobby_resources.exists()) + # and no change has happened to them + self.assertTrue(set(prior_bobby_resources) == set(later_bobby_resources)) + + def test_transfer_resources_default(self): + """ + user wants to transfer resources to target + """ + bobby = get_user_model().objects.get(username="bobby") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="bobby", password="bob")) + self.assertTrue(bobby.is_authenticated) + + # check bobbys resources + bobby_resources = ResourceBase.objects.filter(owner=bobby) + prior_bobby_resources = bobby_resources.all() + self.assertTrue(bobby_resources.exists()) + # call api + response = self.client.post( + path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": "DEFAULT"} + ) + self.assertTrue(response.status_code == 200) + # check that bobby owns the resources no more + self.assertFalse(bobby_resources.exists()) + # check that the resources have been transfered to admin + admin_resources = ResourceBase.objects.filter(owner=admin).all() + self.assertTrue(set(prior_bobby_resources).issubset(set(admin_resources))) + + def test_transfer_resources_to_missing_default(self): + """ + user wants to transfer resources to principal, + but a principal account is missing + """ + bobby = get_user_model().objects.get(username="bobby") + admin = get_user_model().objects.get(username="admin") + + self.assertTrue(self.client.login(username="bobby", password="bob")) + self.client.force_login(bobby) + self.assertTrue(bobby.is_authenticated) + # removal of admin accounts + admin.is_superuser = False + admin.is_staff = False + admin.save() + self.assertFalse(get_user_model().objects.filter(Q(is_superuser=True) | Q(is_staff=True)).exists()) + + # check bobbys resources + bobby_resources = ResourceBase.objects.filter(owner=bobby) + prior_bobby_resources = bobby_resources.all() + self.assertTrue(bobby_resources.exists()) + # call api + response = self.client.post( + path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": "DEFAULT"} + ) + self.assertTrue(response.status_code == 500) + self.assertEqual(response.data, "Principal User not found") + # check that bobby still owns the resources + later_bobby_resources = bobby_resources.all() + # check that the resources havent changed + self.assertTrue(set(prior_bobby_resources) == set(later_bobby_resources)) + + def test_transfer_resources_to_self(self): + """ + user wants to transfer resources to self but should be unable to + """ + bobby = get_user_model().objects.get(username="bobby") + + self.assertTrue(self.client.login(username="bobby", password="bob")) + self.assertTrue(bobby.is_authenticated) + + # check bobbys resources + bobby_resources = ResourceBase.objects.filter(owner=bobby) + prior_bobby_resources = bobby_resources.all() + self.assertTrue(bobby_resources.exists()) + + # call api + response = self.client.post( + path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={"owner": bobby.pk} + ) + self.assertTrue(response.status_code == 400) + self.assertEqual(response.data, "Cannot reassign to self") + # check that bobby still owns the resources + later_bobby_resources = ResourceBase.objects.filter(owner=bobby).all() + # check that the resources havent changed + self.assertTrue(set(prior_bobby_resources) == set(later_bobby_resources)) + + def test_transfer_resources_nopayload(self): + """ + user wants to transfer resources to target + """ + bobby = get_user_model().objects.get(username="bobby") + self.assertTrue(self.client.login(username="bobby", password="bob")) + self.assertTrue(bobby.is_authenticated) + # check bobbys resources + bobby_resources = ResourceBase.objects.filter(owner=bobby) + prior_bobby_resources = bobby_resources.all() + self.assertTrue(bobby_resources.exists()) + + # call api + response = self.client.post(path=f"{reverse('users-list')}/{bobby.pk}/transfer_resources", data={}) + # response should be 404 + self.assertEqual(response.status_code, 404) + # check that bobby still owns the resources + self.assertTrue(bobby_resources.exists()) + later_bobby_resources = ResourceBase.objects.filter(owner=bobby).all() + self.assertTrue(set(prior_bobby_resources) == set(later_bobby_resources)) diff --git a/geonode/people/views.py b/geonode/people/views.py index 184f6a28b60..7da510cf711 100644 --- a/geonode/people/views.py +++ b/geonode/people/views.py @@ -251,6 +251,28 @@ def groups(self, request, pk=None): groups = GroupProfile.objects.filter(id__in=qs_ids) return Response(GroupProfileSerializer(embed=True, many=True).to_representation(groups)) + @action(detail=True, methods=["post"]) + def transfer_resources(self, request, pk=None): + user = self.get_object() + admin = get_user_model().objects.filter(is_superuser=True, is_staff=True).first() + target_user = request.data.get("owner") + + target = None + if target_user == "DEFAULT": + if not admin: + return Response("Principal User not found", status=500) + target = admin + else: + target = get_object_or_404(get_user_model(), id=target_user) + + if target == user: + return Response("Cannot reassign to self", status=400) + + # transfer to target + ResourceBase.objects.filter(owner=user).update(owner=target or user) + + return Response("Resources transfered successfully", status=200) + class ProfileAutocomplete(autocomplete.Select2QuerySetView): def get_queryset(self):