Skip to content

Commit

Permalink
[change:api] Allow org managers and owners to view shared objects #238
Browse files Browse the repository at this point in the history
Updated API permissions classes to allow organization managers and
organization owners to view shared objects.

Related to #238
  • Loading branch information
pandafy authored Mar 11, 2022
1 parent 9ac518d commit 3777960
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 20 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,12 @@ Usage example:
permission_classes = (DjangoModelPermissions,)
queryset = Template.objects.all()
**Note:** ``DjangoModelPermissions`` allows users who
are either organization managers or owners to view
shared objects in read only mode.

Standard users will not be able to view or list shared objects.

Django REST Framework Mixins
----------------------------

Expand Down
33 changes: 26 additions & 7 deletions openwisp_users/api/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@


class OrgLookup:
@property
def org_field(self):
return getattr(self, 'organization_field', 'organization')

@property
def organization_lookup(self):
org_field = getattr(self, 'organization_field', 'organization')
return f'{org_field}__in'
return f'{self.org_field}__in'


class SharedObjectsLookup:
@property
def queryset_organization_conditions(self):
conditions = super().queryset_organization_conditions
organizations = getattr(self.request.user, self._user_attr)
# If user has access to any organization, then include shared
# objects in the queryset.
if len(organizations):
conditions |= Q(**{f'{self.org_field}__isnull': True})
return conditions


class FilterByOrganization(OrgLookup):
Expand All @@ -26,6 +41,12 @@ class FilterByOrganization(OrgLookup):
def _user_attr(self):
raise NotImplementedError()

@property
def queryset_organization_conditions(self):
return Q(
**{self.organization_lookup: getattr(self.request.user, self._user_attr)}
)

def get_queryset(self):
qs = super().get_queryset()
if self.request.user.is_superuser:
Expand All @@ -35,9 +56,7 @@ def get_queryset(self):
def get_organization_queryset(self, qs):
if self.request.user.is_anonymous:
return
return qs.filter(
**{self.organization_lookup: getattr(self.request.user, self._user_attr)}
)
return qs.filter(self.queryset_organization_conditions)


class FilterByOrganizationMembership(FilterByOrganization):
Expand All @@ -48,15 +67,15 @@ class FilterByOrganizationMembership(FilterByOrganization):
_user_attr = 'organizations_dict'


class FilterByOrganizationManaged(FilterByOrganization):
class FilterByOrganizationManaged(SharedObjectsLookup, FilterByOrganization):
"""
Filter queryset by organizations managed by user
"""

_user_attr = 'organizations_managed'


class FilterByOrganizationOwned(FilterByOrganization):
class FilterByOrganizationOwned(SharedObjectsLookup, FilterByOrganization):
"""
Filter queryset by organizations owned by user
"""
Expand Down
71 changes: 59 additions & 12 deletions openwisp_users/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,14 @@
Organization = load_model('openwisp_users', 'Organization')


class BaseOrganizationPermission(BasePermission):
def has_object_permission(self, request, view, obj):
organization = self.get_object_organization(view, obj)
return self.validate_membership(request.user, organization)

def has_permission(self, request, view):
return request.user and request.user.is_authenticated

class ObjectOrganizationMixin(object):
def get_object_organization(self, view, obj):
organization_field = getattr(view, 'organization_field', 'organization')
fields = organization_field.split('__')
accessed_object = obj
for field in fields:
accessed_object = getattr(accessed_object, field, None)
if not accessed_object:
accessed_object = getattr(accessed_object, field, False)
if accessed_object is False:
raise AttributeError(
_(
'Organization not found, `organization_field` '
Expand All @@ -31,6 +24,25 @@ def get_object_organization(self, view, obj):
)
return accessed_object


class BaseOrganizationPermission(ObjectOrganizationMixin, BasePermission):
def has_object_permission(self, request, view, obj):
# Superuser bypasses organization permission checks
if request.user.is_superuser:
return True
organization = self.get_object_organization(view, obj)
if organization is None:
# User should be allowed access to shared objects only if
# they are manager or owner of atleast one organization.
return (
len(request.user.organizations_managed) >= 1
or len(request.user.organizations_owned) >= 1
)
return self.validate_membership(request.user, organization)

def has_permission(self, request, view):
return request.user and request.user.is_authenticated

def validate_membership(self, user, org):
raise NotImplementedError(
_(
Expand All @@ -57,6 +69,12 @@ class IsOrganizationManager(BaseOrganizationPermission):
'requested resource belongs.'
)

def has_permission(self, request, view):
# User must be manager of atleast one organization.
return super().has_permission(request, view) and (
request.user.is_superuser or len(request.user.organizations_managed) > 0
)

def validate_membership(self, user, org):
return org and (user.is_superuser or user.is_manager(org))

Expand All @@ -67,11 +85,17 @@ class IsOrganizationOwner(BaseOrganizationPermission):
'requested resource belongs.'
)

def has_permission(self, request, view):
# User must be owner of atleast one organization.
return super().has_permission(request, view) and (
request.user.is_superuser or len(request.user.organizations_owned) > 0
)

def validate_membership(self, user, org):
return org and (user.is_superuser or user.is_owner(org))


class DjangoModelPermissions(BaseDjangoModelPermissions):
class DjangoModelPermissions(ObjectOrganizationMixin, BaseDjangoModelPermissions):
perms_map = {
'GET': ['%(app_label)s.view_%(model_name)s'],
'OPTIONS': [],
Expand All @@ -81,6 +105,7 @@ class DjangoModelPermissions(BaseDjangoModelPermissions):
'PATCH': ['%(app_label)s.change_%(model_name)s'],
'DELETE': ['%(app_label)s.delete_%(model_name)s'],
}
READ_ONLY_METHOD = ['GET', 'HEAD']

def has_permission(self, request, view):
# Workaround to ensure DjangoModelPermissions are not applied
Expand All @@ -98,5 +123,27 @@ def has_permission(self, request, view):

if request.method == 'GET':
return user.has_perms(perms) or user.has_perms(change_perm)

return user.has_perms(perms)

def has_object_permission(self, request, view, obj):
"""
Controls access to objects that are shared between organizations.
Allow access to only READ_ONLY_METHOD for non-superusers.
"""
if request.user and request.user.is_superuser:
# Superusers will have access to all methods.
return True
try:
organization = self.get_object_organization(view, obj)
except AttributeError:
# The object does not have an organization field. Therefore,
# these tests are not applicable to it.
return True
if organization is None:
if request.method not in self.READ_ONLY_METHOD:
return False
else:
queryset = self._queryset(view)
perms = self.get_required_permissions(request.method, queryset.model)
return request.user.has_perms(perms)
return True
133 changes: 132 additions & 1 deletion tests/testapp/tests/test_permission_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_superuser(self):
self.assertEqual(response.status_code, 200)

def test_base_org_perm_fails(self):
admin = self._get_admin()
admin = self._get_operator()
token = self._obtain_auth_token(username=admin)
self.client.force_login(admin)
auth = dict(HTTP_AUTHORIZATION=f'Bearer {token}')
Expand Down Expand Up @@ -230,3 +230,134 @@ def test_view_django_model_permission_with_change_perm(self):
reverse('test_template_detail', args=[t1.pk]), **auth
)
self.assertEqual(response.status_code, 200)

def _test_access_shared_object(
self, token, expected_templates_count=1, expected_status_codes={}
):
auth = dict(HTTP_AUTHORIZATION=f'Bearer {token}')
template = self._create_template(organization=None)

with self.subTest('Test listing templates'):
response = self.client.get(reverse('test_template_list'), **auth)
data = response.data.copy()
# Only check "templates" in response.
if isinstance(data, dict):
data.pop('detail', None)
self.assertEqual(response.status_code, expected_status_codes['list'])
self.assertEqual(len(data), expected_templates_count)

with self.subTest('Test creating template'):
response = self.client.post(
reverse('test_template_list'),
data={'name': 'Test Template', 'organization': None},
content_type='application/json',
**auth,
)
self.assertEqual(response.status_code, expected_status_codes['create'])
if expected_status_codes['create'] == 400:
self.assertEqual(
str(response.data['organization'][0]), 'This field may not be null.'
)

with self.subTest('Test retreiving template'):
response = self.client.get(
reverse('test_template_detail', args=[template.id]), **auth
)
self.assertEqual(response.status_code, expected_status_codes['retrieve'])

with self.subTest('Test updating template'):
response = self.client.put(
reverse('test_template_detail', args=[template.id]),
data={'name': 'Name changed'},
content_type='application/json',
**auth,
)
self.assertEqual(response.status_code, expected_status_codes['update'])

with self.subTest('Test deleting template'):
response = self.client.delete(
reverse('test_template_detail', args=[template.id]), **auth
)
self.assertEqual(response.status_code, expected_status_codes['delete'])

with self.subTest('Test HEAD and OPTION methods'):
response = self.client.head(reverse('test_template_list'), **auth)
self.assertEqual(response.status_code, expected_status_codes['head'])

response = self.client.options(reverse('test_template_list'), **auth)
self.assertEqual(response.status_code, expected_status_codes['option'])

def test_superuser_access_shared_object(self):
superuser = self._get_admin()
token = self._obtain_auth_token(username=superuser)
self._test_access_shared_object(
token,
expected_status_codes={
'create': 201,
'list': 200,
'retrieve': 200,
'update': 200,
'delete': 204,
'head': 200,
'option': 200,
},
)

def test_org_manager_access_shared_object(self):
org_manager = self._create_administrator()
token = self._obtain_auth_token(username=org_manager)
# First user is automatically owner, so created dummy
# user to keep operator as manager only.
self._create_org_user(user=self._get_user(), is_admin=True)
self._create_org_user(user=org_manager, is_admin=True)
self._test_access_shared_object(
token,
expected_status_codes={
'create': 400,
'list': 200,
'retrieve': 200,
'update': 403,
'delete': 403,
'head': 200,
'option': 200,
},
)

def test_org_owner_access_shared_object(self):
# The first admin of an organization automatically
# becomes organization owner.
org_owner = self._create_administrator(organizations=[self._get_org()])
token = self._obtain_auth_token(username=org_owner)
self._test_access_shared_object(
token,
expected_status_codes={
'create': 400,
'list': 200,
'retrieve': 200,
'update': 403,
'delete': 403,
'head': 200,
'option': 200,
},
)

def test_org_user_access_shared_object(self):
# The test uses a user with operator permissions,
# because Django's model permission will deny permissions
# to the view and this test won't test "shared object" permissions.
user = self._create_administrator()
token = self._obtain_auth_token(username=user)
self._create_org_user(user=user, is_admin=False)
self._test_access_shared_object(
token,
expected_templates_count=0,
expected_status_codes={
'create': 400,
'list': 200,
'retrieve': 404,
'update': 404,
'delete': 404,
'head': 200,
'option': 200,
},
)

0 comments on commit 3777960

Please sign in to comment.