diff --git a/README.rst b/README.rst index f97a676a..b71676f5 100644 --- a/README.rst +++ b/README.rst @@ -808,6 +808,12 @@ Usage example: permission_classes = (DjangoModelPermissions,) queryset = Template.objects.all() +**Note:** ``DjangoModelPermissions`` allows users who +are either organization managers or owners to view +shared objects in read only mode. + +Standard users will not be able to view or list shared objects. + Django REST Framework Mixins ---------------------------- diff --git a/openwisp_users/api/mixins.py b/openwisp_users/api/mixins.py index aa6875c1..5a06342d 100644 --- a/openwisp_users/api/mixins.py +++ b/openwisp_users/api/mixins.py @@ -8,10 +8,25 @@ class OrgLookup: + @property + def org_field(self): + return getattr(self, 'organization_field', 'organization') + @property def organization_lookup(self): - org_field = getattr(self, 'organization_field', 'organization') - return f'{org_field}__in' + return f'{self.org_field}__in' + + +class SharedObjectsLookup: + @property + def queryset_organization_conditions(self): + conditions = super().queryset_organization_conditions + organizations = getattr(self.request.user, self._user_attr) + # If user has access to any organization, then include shared + # objects in the queryset. + if len(organizations): + conditions |= Q(**{f'{self.org_field}__isnull': True}) + return conditions class FilterByOrganization(OrgLookup): @@ -26,6 +41,12 @@ class FilterByOrganization(OrgLookup): def _user_attr(self): raise NotImplementedError() + @property + def queryset_organization_conditions(self): + return Q( + **{self.organization_lookup: getattr(self.request.user, self._user_attr)} + ) + def get_queryset(self): qs = super().get_queryset() if self.request.user.is_superuser: @@ -35,9 +56,7 @@ def get_queryset(self): def get_organization_queryset(self, qs): if self.request.user.is_anonymous: return - return qs.filter( - **{self.organization_lookup: getattr(self.request.user, self._user_attr)} - ) + return qs.filter(self.queryset_organization_conditions) class FilterByOrganizationMembership(FilterByOrganization): @@ -48,7 +67,7 @@ class FilterByOrganizationMembership(FilterByOrganization): _user_attr = 'organizations_dict' -class FilterByOrganizationManaged(FilterByOrganization): +class FilterByOrganizationManaged(SharedObjectsLookup, FilterByOrganization): """ Filter queryset by organizations managed by user """ @@ -56,7 +75,7 @@ class FilterByOrganizationManaged(FilterByOrganization): _user_attr = 'organizations_managed' -class FilterByOrganizationOwned(FilterByOrganization): +class FilterByOrganizationOwned(SharedObjectsLookup, FilterByOrganization): """ Filter queryset by organizations owned by user """ diff --git a/openwisp_users/api/permissions.py b/openwisp_users/api/permissions.py index 04c4c6ff..e8ee6096 100644 --- a/openwisp_users/api/permissions.py +++ b/openwisp_users/api/permissions.py @@ -8,21 +8,14 @@ Organization = load_model('openwisp_users', 'Organization') -class BaseOrganizationPermission(BasePermission): - def has_object_permission(self, request, view, obj): - organization = self.get_object_organization(view, obj) - return self.validate_membership(request.user, organization) - - def has_permission(self, request, view): - return request.user and request.user.is_authenticated - +class ObjectOrganizationMixin(object): def get_object_organization(self, view, obj): organization_field = getattr(view, 'organization_field', 'organization') fields = organization_field.split('__') accessed_object = obj for field in fields: - accessed_object = getattr(accessed_object, field, None) - if not accessed_object: + accessed_object = getattr(accessed_object, field, False) + if accessed_object is False: raise AttributeError( _( 'Organization not found, `organization_field` ' @@ -31,6 +24,25 @@ def get_object_organization(self, view, obj): ) return accessed_object + +class BaseOrganizationPermission(ObjectOrganizationMixin, BasePermission): + def has_object_permission(self, request, view, obj): + # Superuser bypasses organization permission checks + if request.user.is_superuser: + return True + organization = self.get_object_organization(view, obj) + if organization is None: + # User should be allowed access to shared objects only if + # they are manager or owner of atleast one organization. + return ( + len(request.user.organizations_managed) >= 1 + or len(request.user.organizations_owned) >= 1 + ) + return self.validate_membership(request.user, organization) + + def has_permission(self, request, view): + return request.user and request.user.is_authenticated + def validate_membership(self, user, org): raise NotImplementedError( _( @@ -57,6 +69,12 @@ class IsOrganizationManager(BaseOrganizationPermission): 'requested resource belongs.' ) + def has_permission(self, request, view): + # User must be manager of atleast one organization. + return super().has_permission(request, view) and ( + request.user.is_superuser or len(request.user.organizations_managed) > 0 + ) + def validate_membership(self, user, org): return org and (user.is_superuser or user.is_manager(org)) @@ -67,11 +85,17 @@ class IsOrganizationOwner(BaseOrganizationPermission): 'requested resource belongs.' ) + def has_permission(self, request, view): + # User must be owner of atleast one organization. + return super().has_permission(request, view) and ( + request.user.is_superuser or len(request.user.organizations_owned) > 0 + ) + def validate_membership(self, user, org): return org and (user.is_superuser or user.is_owner(org)) -class DjangoModelPermissions(BaseDjangoModelPermissions): +class DjangoModelPermissions(ObjectOrganizationMixin, BaseDjangoModelPermissions): perms_map = { 'GET': ['%(app_label)s.view_%(model_name)s'], 'OPTIONS': [], @@ -81,6 +105,7 @@ class DjangoModelPermissions(BaseDjangoModelPermissions): 'PATCH': ['%(app_label)s.change_%(model_name)s'], 'DELETE': ['%(app_label)s.delete_%(model_name)s'], } + READ_ONLY_METHOD = ['GET', 'HEAD'] def has_permission(self, request, view): # Workaround to ensure DjangoModelPermissions are not applied @@ -98,5 +123,27 @@ def has_permission(self, request, view): if request.method == 'GET': return user.has_perms(perms) or user.has_perms(change_perm) - return user.has_perms(perms) + + def has_object_permission(self, request, view, obj): + """ + Controls access to objects that are shared between organizations. + Allow access to only READ_ONLY_METHOD for non-superusers. + """ + if request.user and request.user.is_superuser: + # Superusers will have access to all methods. + return True + try: + organization = self.get_object_organization(view, obj) + except AttributeError: + # The object does not have an organization field. Therefore, + # these tests are not applicable to it. + return True + if organization is None: + if request.method not in self.READ_ONLY_METHOD: + return False + else: + queryset = self._queryset(view) + perms = self.get_required_permissions(request.method, queryset.model) + return request.user.has_perms(perms) + return True diff --git a/tests/testapp/tests/test_permission_classes.py b/tests/testapp/tests/test_permission_classes.py index c8206a93..72ccc9ee 100644 --- a/tests/testapp/tests/test_permission_classes.py +++ b/tests/testapp/tests/test_permission_classes.py @@ -101,7 +101,7 @@ def test_superuser(self): self.assertEqual(response.status_code, 200) def test_base_org_perm_fails(self): - admin = self._get_admin() + admin = self._get_operator() token = self._obtain_auth_token(username=admin) self.client.force_login(admin) auth = dict(HTTP_AUTHORIZATION=f'Bearer {token}') @@ -230,3 +230,134 @@ def test_view_django_model_permission_with_change_perm(self): reverse('test_template_detail', args=[t1.pk]), **auth ) self.assertEqual(response.status_code, 200) + + def _test_access_shared_object( + self, token, expected_templates_count=1, expected_status_codes={} + ): + auth = dict(HTTP_AUTHORIZATION=f'Bearer {token}') + template = self._create_template(organization=None) + + with self.subTest('Test listing templates'): + response = self.client.get(reverse('test_template_list'), **auth) + data = response.data.copy() + # Only check "templates" in response. + if isinstance(data, dict): + data.pop('detail', None) + self.assertEqual(response.status_code, expected_status_codes['list']) + self.assertEqual(len(data), expected_templates_count) + + with self.subTest('Test creating template'): + response = self.client.post( + reverse('test_template_list'), + data={'name': 'Test Template', 'organization': None}, + content_type='application/json', + **auth, + ) + self.assertEqual(response.status_code, expected_status_codes['create']) + if expected_status_codes['create'] == 400: + self.assertEqual( + str(response.data['organization'][0]), 'This field may not be null.' + ) + + with self.subTest('Test retreiving template'): + response = self.client.get( + reverse('test_template_detail', args=[template.id]), **auth + ) + self.assertEqual(response.status_code, expected_status_codes['retrieve']) + + with self.subTest('Test updating template'): + response = self.client.put( + reverse('test_template_detail', args=[template.id]), + data={'name': 'Name changed'}, + content_type='application/json', + **auth, + ) + self.assertEqual(response.status_code, expected_status_codes['update']) + + with self.subTest('Test deleting template'): + response = self.client.delete( + reverse('test_template_detail', args=[template.id]), **auth + ) + self.assertEqual(response.status_code, expected_status_codes['delete']) + + with self.subTest('Test HEAD and OPTION methods'): + response = self.client.head(reverse('test_template_list'), **auth) + self.assertEqual(response.status_code, expected_status_codes['head']) + + response = self.client.options(reverse('test_template_list'), **auth) + self.assertEqual(response.status_code, expected_status_codes['option']) + + def test_superuser_access_shared_object(self): + superuser = self._get_admin() + token = self._obtain_auth_token(username=superuser) + self._test_access_shared_object( + token, + expected_status_codes={ + 'create': 201, + 'list': 200, + 'retrieve': 200, + 'update': 200, + 'delete': 204, + 'head': 200, + 'option': 200, + }, + ) + + def test_org_manager_access_shared_object(self): + org_manager = self._create_administrator() + token = self._obtain_auth_token(username=org_manager) + # First user is automatically owner, so created dummy + # user to keep operator as manager only. + self._create_org_user(user=self._get_user(), is_admin=True) + self._create_org_user(user=org_manager, is_admin=True) + self._test_access_shared_object( + token, + expected_status_codes={ + 'create': 400, + 'list': 200, + 'retrieve': 200, + 'update': 403, + 'delete': 403, + 'head': 200, + 'option': 200, + }, + ) + + def test_org_owner_access_shared_object(self): + # The first admin of an organization automatically + # becomes organization owner. + org_owner = self._create_administrator(organizations=[self._get_org()]) + token = self._obtain_auth_token(username=org_owner) + self._test_access_shared_object( + token, + expected_status_codes={ + 'create': 400, + 'list': 200, + 'retrieve': 200, + 'update': 403, + 'delete': 403, + 'head': 200, + 'option': 200, + }, + ) + + def test_org_user_access_shared_object(self): + # The test uses a user with operator permissions, + # because Django's model permission will deny permissions + # to the view and this test won't test "shared object" permissions. + user = self._create_administrator() + token = self._obtain_auth_token(username=user) + self._create_org_user(user=user, is_admin=False) + self._test_access_shared_object( + token, + expected_templates_count=0, + expected_status_codes={ + 'create': 400, + 'list': 200, + 'retrieve': 404, + 'update': 404, + 'delete': 404, + 'head': 200, + 'option': 200, + }, + )