Skip to content

Commit

Permalink
Update get_[users|groups]_with_perms to match guardian
Browse files Browse the repository at this point in the history
fixes pulp#2739
  • Loading branch information
newswangerd authored and mdellweg committed Jun 1, 2022
1 parent 18d9f70 commit b8c918e
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGES/2739.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add options to the role_util functions to make them work the same as guardian did.
139 changes: 99 additions & 40 deletions pulpcore/app/role_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,17 @@ def get_objects_for_group(group, perms, qs, any_perm=False, accept_global_perms=


def get_users_with_perms_roles(
obj, with_superusers=False, with_group_users=True, only_with_perms_in=None
obj,
with_superusers=False,
with_group_users=True,
only_with_perms_in=None,
include_model_permissions=True,
for_concrete_model=False,
):
qs = User.objects.none()
if with_superusers:
qs |= User.objects.filter(is_superuser=True)
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=False)
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=for_concrete_model)
perms = Permission.objects.filter(content_type__pk=ctype.id)
if only_with_perms_in:
codenames = [
Expand All @@ -234,9 +239,12 @@ def get_users_with_perms_roles(
if len(split_perm) == 1 or split_perm[0] == ctype.app_label
]
perms = perms.filter(codename__in=codenames)
user_roles = UserRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)

object_query = Q(content_type=ctype, object_id=obj.pk)
if include_model_permissions:
object_query = Q(object_id=None) | object_query

user_roles = UserRole.objects.filter(role__permissions__in=perms).filter(object_query)
qs |= User.objects.filter(Exists(user_roles.filter(user=OuterRef("pk"))))
if with_group_users:
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(
Expand All @@ -248,9 +256,14 @@ def get_users_with_perms_roles(


def get_users_with_perms_attached_perms(
obj, with_superusers=False, with_group_users=True, only_with_perms_in=None
obj,
with_superusers=False,
with_group_users=True,
only_with_perms_in=None,
include_model_permissions=True,
for_concrete_model=False,
):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=False)
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=for_concrete_model)
perms = Permission.objects.filter(content_type__pk=ctype.id)
if only_with_perms_in:
codenames = [
Expand All @@ -259,9 +272,12 @@ def get_users_with_perms_attached_perms(
if len(split_perm) == 1 or split_perm[0] == ctype.app_label
]
perms = perms.filter(codename__in=codenames)
user_roles = UserRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)

object_query = Q(content_type=ctype, object_id=obj.pk)
if include_model_permissions:
object_query = Q(object_id=None) | object_query

user_roles = UserRole.objects.filter(role__permissions__in=perms).filter(object_query)
res = defaultdict(set)
if with_superusers:
for user in User.objects.filter(is_superuser=True):
Expand All @@ -271,9 +287,7 @@ def get_users_with_perms_attached_perms(
user_role.role.permissions.filter(pk__in=perms).values_list("codename", flat=True)
)
if with_group_users:
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(object_query)
for group_role in group_roles:
for user in group_role.group.user_set.all():
res[user].update(
Expand All @@ -284,8 +298,14 @@ def get_users_with_perms_attached_perms(
return {k: list(v) for k, v in res.items()}


def get_users_with_perms_attached_roles(obj, with_group_users=True, only_with_perms_in=None):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=False)
def get_users_with_perms_attached_roles(
obj,
with_group_users=True,
only_with_perms_in=None,
include_model_permissions=True,
for_concrete_model=False,
):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=for_concrete_model)
perms = Permission.objects.filter(content_type__pk=ctype.id)
if only_with_perms_in:
codenames = [
Expand All @@ -294,16 +314,17 @@ def get_users_with_perms_attached_roles(obj, with_group_users=True, only_with_pe
if len(split_perm) == 1 or split_perm[0] == ctype.app_label
]
perms = perms.filter(codename__in=codenames)
user_roles = UserRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)

object_query = Q(content_type=ctype, object_id=obj.pk)
if include_model_permissions:
object_query = Q(object_id=None) | object_query

user_roles = UserRole.objects.filter(role__permissions__in=perms).filter(object_query)
res = defaultdict(set)
for user_role in user_roles:
res[user_role.user].update(user_role.role.name)
if with_group_users:
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(object_query)
for group_role in group_roles:
for user in group_role.group.user_set.all():
res[user].update(group_role.role.name)
Expand All @@ -312,7 +333,13 @@ def get_users_with_perms_attached_roles(obj, with_group_users=True, only_with_pe

# Interface copied from django guardian
def get_users_with_perms(
obj, attach_perms=False, with_superusers=False, with_group_users=True, only_with_perms_in=None
obj,
attach_perms=False,
with_superusers=False,
with_group_users=True,
only_with_perms_in=None,
include_model_permissions=True,
for_concrete_model=False,
):
if attach_perms:
res = defaultdict(set)
Expand All @@ -322,6 +349,8 @@ def get_users_with_perms(
with_superusers=with_superusers,
with_group_users=with_group_users,
only_with_perms_in=only_with_perms_in,
include_model_permissions=include_model_permissions,
for_concrete_model=for_concrete_model,
).items():
res[key].update(value)
return {k: list(v) for k, v in res.items()}
Expand All @@ -332,12 +361,16 @@ def get_users_with_perms(
with_superusers=with_superusers,
with_group_users=with_group_users,
only_with_perms_in=only_with_perms_in,
include_model_permissions=include_model_permissions,
for_concrete_model=for_concrete_model,
)
return qs.distinct()


def get_groups_with_perms_roles(obj, only_with_perms_in=None):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=False)
def get_groups_with_perms_roles(
obj, only_with_perms_in=None, include_model_permissions=True, for_concrete_model=False
):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=for_concrete_model)
perms = Permission.objects.filter(content_type__pk=ctype.id)
if only_with_perms_in:
codenames = [
Expand All @@ -346,15 +379,20 @@ def get_groups_with_perms_roles(obj, only_with_perms_in=None):
if len(split_perm) == 1 or split_perm[0] == ctype.app_label
]
perms = perms.filter(codename__in=codenames)
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)

object_query = Q(content_type=ctype, object_id=obj.pk)
if include_model_permissions:
object_query = Q(object_id=None) | object_query

group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(object_query)
qs = Group.objects.filter(Exists(group_roles.filter(group=OuterRef("pk")))).distinct()
return qs


def get_groups_with_perms_attached_perms(obj, only_with_perms_in=None):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=False)
def get_groups_with_perms_attached_perms(
obj, only_with_perms_in=None, include_model_permissions=True, for_concrete_model=False
):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=for_concrete_model)
perms = Permission.objects.filter(content_type__pk=ctype.id)
if only_with_perms_in:
codenames = [
Expand All @@ -363,9 +401,12 @@ def get_groups_with_perms_attached_perms(obj, only_with_perms_in=None):
if len(split_perm) == 1 or split_perm[0] == ctype.app_label
]
perms = perms.filter(codename__in=codenames)
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)

object_query = Q(content_type=ctype, object_id=obj.pk)
if include_model_permissions:
object_query = Q(object_id=None) | object_query

group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(object_query)
res = defaultdict(set)
for group_role in group_roles:
res[group_role.group].update(
Expand All @@ -374,8 +415,10 @@ def get_groups_with_perms_attached_perms(obj, only_with_perms_in=None):
return {k: list(v) for k, v in res.items()}


def get_groups_with_perms_attached_roles(obj, only_with_perms_in=None):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=False)
def get_groups_with_perms_attached_roles(
obj, only_with_perms_in=None, include_model_permissions=True, for_concrete_model=False
):
ctype = ContentType.objects.get_for_model(obj, for_concrete_model=for_concrete_model)
perms = Permission.objects.filter(content_type__pk=ctype.id)
if only_with_perms_in:
codenames = [
Expand All @@ -384,25 +427,41 @@ def get_groups_with_perms_attached_roles(obj, only_with_perms_in=None):
if len(split_perm) == 1 or split_perm[0] == ctype.app_label
]
perms = perms.filter(codename__in=codenames)
group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(
Q(object_id=None) | Q(content_type=ctype, object_id=obj.pk)
)

object_query = Q(content_type=ctype, object_id=obj.pk)
if include_model_permissions:
object_query = Q(object_id=None) | object_query

group_roles = GroupRole.objects.filter(role__permissions__in=perms).filter(object_query)
res = defaultdict(set)
for group_role in group_roles:
res[group_role.group].add(group_role.role.name)
return {k: list(v) for k, v in res.items()}


# Interface copied from django guardian
def get_groups_with_perms(obj, attach_perms=False):
def get_groups_with_perms(
obj,
attach_perms=False,
include_model_permissions=True,
for_concrete_model=False,
):
if attach_perms:
res = defaultdict(set)
if "pulpcore.backends.ObjectRolePermissionBackend" in settings.AUTHENTICATION_BACKENDS:
for key, value in get_groups_with_perms_attached_perms(obj).items():
for key, value in get_groups_with_perms_attached_perms(
obj,
include_model_permissions=include_model_permissions,
for_concrete_model=for_concrete_model,
).items():
res[key].update(value)
return {k: list(v) for k, v in res.items()}
else:
qs = Group.objects.none()
if "pulpcore.backends.ObjectRolePermissionBackend" in settings.AUTHENTICATION_BACKENDS:
qs |= get_groups_with_perms_roles(obj)
qs |= get_groups_with_perms_roles(
obj,
include_model_permissions=include_model_permissions,
for_concrete_model=for_concrete_model,
)
return qs.distinct()

0 comments on commit b8c918e

Please sign in to comment.