Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RHCLOUD-33414] - restrict the Custom default access group renaming #1383

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions rbac/management/group/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ def get_roleCount(self, obj):
"""Role count for the serializer."""
return obj.role_count()

def is_custom_default_group(self, group):
"""Check if the group is custom default or not."""
if group.platform_default and not group.system:
return True
return False
EvanCasey13 marked this conversation as resolved.
Show resolved Hide resolved

def restrict_custom_default_group_renaming(self, group):
"""Restrict users from changing the name or description of the Custom default group."""
is_custom_default = self.is_custom_default_group(group)
if is_custom_default:
key = "detail"
message = "Updating the name or description of 'Custom default group' is restricted"
error = {key: (message)}
raise serializers.ValidationError(error)

class Meta:
"""Metadata for the serializer."""

Expand All @@ -69,6 +84,7 @@ def create(self, validated_data):
def update(self, instance, validated_data):
"""Update the role object in the database."""
group = super().update(instance, validated_data)
self.restrict_custom_default_group_renaming(group)
group_obj_change_notification_handler(self.context["request"].user, group, "updated")
return group

Expand Down
1 change: 1 addition & 0 deletions rbac/management/group/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ def update(self, request, *args, **kwargs):
self.protect_system_groups("update")

group = self.get_object()

if not request.user.admin:
self.protect_group_with_user_access_admin_role(group.roles_with_access(), "update_group")

Expand Down
38 changes: 27 additions & 11 deletions tests/management/group/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def setUp(self):
self.defPolicy = Policy(name="defPolicy", system=True, tenant=self.public_tenant, group=self.defGroup)
self.defPolicy.save()

self.custom_default_group = Group(
name="Custom default group", platform_default=True, system=False, tenant=self.public_tenant
)
self.custom_default_group.save()

self.adminGroup = Group(name="groupAdmin", admin_default=True, tenant=self.public_tenant, system=True)
self.adminGroup.save()
self.adminGroup.principals.add(self.principal, self.test_principal)
Expand Down Expand Up @@ -440,11 +445,11 @@ def test_read_group_list_success(self):
for keyname in ["meta", "links", "data"]:
self.assertIn(keyname, response.data)
self.assertIsInstance(response.data.get("data"), list)
self.assertEqual(len(response.data.get("data")), 6)
self.assertEqual(len(response.data.get("data")), 7)

group = response.data.get("data")[0]
self.assertIsNotNone(group.get("name"))
self.assertEqual(group.get("name"), self.group.name)
self.assertEqual(group.get("name"), self.custom_default_group.name)

# check that all fields from GroupInputSerializer are present
for key in GroupInputSerializer().fields.keys():
Expand Down Expand Up @@ -546,15 +551,15 @@ def test_get_group_by_partial_name_by_default(self):
url = "{}?name={}".format(url, "group")
client = APIClient()
response = client.get(url, **self.headers)
self.assertEqual(response.data.get("meta").get("count"), 6)
self.assertEqual(response.data.get("meta").get("count"), 7)

def test_get_group_by_partial_name_explicit(self):
"""Test that getting groups by name returns partial match when specified."""
url = reverse("v1_management:group-list")
url = "{}?name={}&name_match={}".format(url, "group", "partial")
client = APIClient()
response = client.get(url, **self.headers)
self.assertEqual(response.data.get("meta").get("count"), 6)
self.assertEqual(response.data.get("meta").get("count"), 7)

def test_get_group_by_name_invalid_criteria(self):
"""Test that getting groups by name fails with invalid name_match."""
Expand Down Expand Up @@ -657,13 +662,14 @@ def test_filter_group_list_by_system_false(self):
response = client.get(url, **self.headers)

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data.get("data")), 4)
self.assertEqual(len(response.data.get("data")), 5)
response_group_uuids = [group["uuid"] for group in response.data.get("data")]
self.assertCountEqual(
response_group_uuids,
[
str(self.group.uuid),
str(self.groupB.uuid),
str(self.custom_default_group.uuid),
str(self.emptyGroup.uuid),
str(self.groupMultiRole.uuid),
],
Expand Down Expand Up @@ -726,12 +732,13 @@ def test_filter_group_list_by_admin_default_false(self):
response = client.get(url, **self.headers)

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data.get("data")), 5)
self.assertEqual(len(response.data.get("data")), 6)
response_group_uuids = [group["uuid"] for group in response.data.get("data")]
self.assertCountEqual(
response_group_uuids,
[
str(self.group.uuid),
str(self.custom_default_group.uuid),
str(self.groupB.uuid),
str(self.emptyGroup.uuid),
str(self.groupMultiRole.uuid),
Expand All @@ -755,6 +762,7 @@ def test_update_group_success(self, send_kafka_message, mock_request):
url = reverse("v1_management:group-detail", kwargs={"uuid": self.group.uuid})
client = APIClient()
response = client.put(url, test_data, format="json", **self.headers)

self.assertEqual(response.status_code, status.HTTP_200_OK)

self.assertIsNotNone(response.data.get("uuid"))
Expand Down Expand Up @@ -808,6 +816,14 @@ def test_update_default_group(self):
response = client.put(url, test_data, format="json", **self.headers)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

def test_update_custom_default_group(self):
"""Test that Custom default group is protected from updates"""
url = reverse("v1_management:group-detail", kwargs={"uuid": self.custom_default_group.uuid})
test_data = {"name": "new_name" + "_updated", "description": "new_description" + "_updated"}
client = APIClient()
response = client.put(url, test_data, format="json", **self.headers)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

def test_update_admin_default_group(self):
"""Test that admin_default groups are protected from updates"""
url = reverse("v1_management:group-detail", kwargs={"uuid": self.adminGroup.uuid})
Expand Down Expand Up @@ -1018,8 +1034,8 @@ def assert_group_tuples(tuple_to_replicate):

url = f"{reverse('v1_management:group-list')}?platform_default=true"
response = client.get(url, **self.headers)
self.assertEqual(len(response.data.get("data")), 1)
self.assertEqual(response.data.get("data")[0]["uuid"], str(self.defGroup.uuid))
self.assertEqual(len(response.data.get("data")), 2)
self.assertEqual(response.data.get("data")[1]["uuid"], str(self.defGroup.uuid))

def test_delete_group_invalid(self):
"""Test that deleting an invalid group returns an error."""
Expand Down Expand Up @@ -1343,7 +1359,7 @@ def test_get_group_by_username(self, mock_request):
url = "{}?username={}".format(url, self.test_principal.username)
client = APIClient()
response = client.get(url, **self.test_headers)
self.assertEqual(response.data.get("meta").get("count"), 4)
self.assertEqual(response.data.get("meta").get("count"), 5)

# Return bad request when user does not exist
url = reverse("v1_management:group-list")
Expand Down Expand Up @@ -1376,7 +1392,7 @@ def test_get_group_by_username_no_assigned_group(self, mock_request):
url = "{}?username={}".format(url, self.principalC.username)
client = APIClient()
response = client.get(url, **self.test_headers)
self.assertEqual(response.data.get("meta").get("count"), 2)
self.assertEqual(response.data.get("meta").get("count"), 3)

@patch(
"management.principal.proxy.PrincipalProxy.request_filtered_principals",
Expand Down Expand Up @@ -1432,7 +1448,7 @@ def test_get_group_by_username_with_capitalization(self, mock_request):
url = "{}?username={}".format(url, username)
client = APIClient()
response = client.get(url, **self.test_headers)
self.assertEqual(response.data.get("meta").get("count"), 4)
self.assertEqual(response.data.get("meta").get("count"), 5)

def test_get_group_roles_success(self):
"""Test that getting roles for a group returns successfully."""
Expand Down
Loading