diff --git a/README.rst b/README.rst index 4ef08b5b..c5a4f857 100644 --- a/README.rst +++ b/README.rst @@ -194,6 +194,18 @@ is enabled or not. It is disabled by default because `OpenWISP `_ does not use this feature of `django-organizations `_ yet. +Multitenancy mixins +------------------- + +* **MultitenantAdminMixin**: adding this mixin to a ``ModelAdmin`` class will make it multitenant. + Set ``multitenant_shared_relations`` to the list of parameters you wish to have only organization + specific options. + +* **MultitenantOrgFilter**: admin filter that shows only organizations the current user is associated with in its available choices. + +* **MultitenantRelatedOrgFilter**: similar ``MultitenantOrgFilter`` but shows only objects which have a relation with + one of the organizations the current user is associated with. + Contributing ------------ diff --git a/openwisp_users/multitenancy.py b/openwisp_users/multitenancy.py new file mode 100644 index 00000000..5fd1bd3b --- /dev/null +++ b/openwisp_users/multitenancy.py @@ -0,0 +1,102 @@ +from django.contrib import admin +from django.db.models import Q +from django.utils.translation import ugettext_lazy as _ + + +class MultitenantAdminMixin(object): + """ + Mixin that makes a ModelAdmin class multitenant: + users will see only the objects related to the organizations + they are associated with. + """ + multitenant_shared_relations = [] + multitenant_parent = None + + def __init__(self, *args, **kwargs): + super(MultitenantAdminMixin, self).__init__(*args, **kwargs) + parent = self.multitenant_parent + shared_relations = self.multitenant_shared_relations + if parent and parent not in shared_relations: + self.multitenant_shared_relations.append(parent) + + def get_repr(self, obj): + return str(obj) + + get_repr.short_description = _('name') + + def get_queryset(self, request): + """ + If current user is not superuser, show only the + objects associated to organizations he/she is associated with + """ + qs = super(MultitenantAdminMixin, self).get_queryset(request) + user = request.user + if user.is_superuser: + return qs + if hasattr(self.model, 'organization'): + return qs.filter(organization__in=user.organizations_pk) + elif not self.multitenant_parent: + return qs + else: + qsarg = '{0}__organization__in'.format(self.multitenant_parent) + return qs.filter(**{qsarg: user.organizations_pk}) + + def _edit_form(self, request, form): + """ + Modifies the form querysets as follows; + if current user is not superuser: + * show only relevant organizations + * show only relations associated to relevant organizations + or shared relations + else show everything + """ + fields = form.base_fields + if not request.user.is_superuser: + orgs_pk = request.user.organizations_pk + # organizations relation; + # may be readonly and not present in field list + if 'organization' in fields: + org_field = fields['organization'] + org_field.queryset = org_field.queryset.filter(pk__in=orgs_pk) + # other relations + q = Q(organization__in=orgs_pk) | Q(organization=None) + for field_name in self.multitenant_shared_relations: + # each relation may be readonly + # and not present in field list + if field_name not in fields: + continue + field = fields[field_name] + field.queryset = field.queryset.filter(q) + + def get_form(self, request, obj=None, **kwargs): + form = super(MultitenantAdminMixin, self).get_form(request, obj, **kwargs) + self._edit_form(request, form) + return form + + def get_formset(self, request, obj=None, **kwargs): + formset = super(MultitenantAdminMixin, self).get_formset(request, obj=None, **kwargs) + self._edit_form(request, formset.form) + return formset + + +class MultitenantOrgFilter(admin.RelatedFieldListFilter): + """ + Admin filter that shows only organizations the current + user is associated with in its available choices + """ + multitenant_lookup = 'pk__in' + + def field_choices(self, field, request, model_admin): + if request.user.is_superuser: + return super(MultitenantOrgFilter, self).field_choices(field, request, model_admin) + organizations = request.user.organizations_pk + return field.get_choices(include_blank=False, + limit_choices_to={self.multitenant_lookup: organizations}) + + +class MultitenantRelatedOrgFilter(MultitenantOrgFilter): + """ + Admin filter that shows only objects which have a relation with + one of the organizations the current user is associated with + """ + multitenant_lookup = 'organization__in' diff --git a/openwisp_users/tests/utils.py b/openwisp_users/tests/utils.py index a9981ec0..a3453925 100644 --- a/openwisp_users/tests/utils.py +++ b/openwisp_users/tests/utils.py @@ -1,7 +1,91 @@ +from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission +from django.db.models import Q +from django.urls import reverse from ..models import Organization, OrganizationOwner, OrganizationUser, User +user_model = get_user_model() + + +class TestMultitenantAdminMixin(object): + def setUp(self): + user_model.objects.create_superuser(username='admin', + password='tester', + email='admin@admin.com') + + def _login(self, username='admin', password='tester'): + self.client.login(username=username, password=password) + + def _logout(self): + self.client.logout() + + operator_permission_filters = [] + + def get_operator_permissions(self): + filters = Q() + for filter in self.operator_permission_filters: + filters = filters | Q(**filter) + return Permission.objects.filter(filters) + + def _create_operator(self, organizations=[]): + operator = user_model.objects.create_user(username='operator', + password='tester', + email='operator@test.com', + is_staff=True) + operator.user_permissions.add(*self.get_operator_permissions()) + for organization in organizations: + OrganizationUser.objects.create(user=operator, organization=organization) + return operator + + def _test_multitenant_admin(self, url, visible, hidden, select_widget=False): + """ + reusable test function that ensures different users + can see the right objects. + an operator with limited permissions will not be able + to see the elements contained in ``hidden``, while + a superuser can see everything. + """ + self._login(username='operator', password='tester') + response = self.client.get(url) + + # utility format function + def _f(el, select_widget=False): + if select_widget: + return '{0}'.format(el) + return el + + # ensure elements in visible list are visible to operator + for el in visible: + self.assertContains(response, _f(el, select_widget), + msg_prefix='[operator contains]') + # ensure elements in hidden list are not visible to operator + for el in hidden: + self.assertNotContains(response, _f(el, select_widget), + msg_prefix='[operator not-contains]') + + # now become superuser + self._logout() + self._login(username='admin', password='tester') + response = self.client.get(url) + # ensure all elements are visible to superuser + all_elements = visible + hidden + for el in all_elements: + self.assertContains(response, _f(el, select_widget), + msg_prefix='[superuser contains]') + + def _test_changelist_recover_deleted(self, app_label, model_label): + self._test_multitenant_admin( + url=reverse('admin:{0}_{1}_changelist'.format(app_label, model_label)), + visible=[], + hidden=['Recover deleted'] + ) + + def _test_recoverlist_operator_403(self, app_label, model_label): + self._login(username='operator', password='tester') + response = self.client.get(reverse('admin:{0}_{1}_recoverlist'.format(app_label, model_label))) + self.assertEqual(response.status_code, 403) + class TestOrganizationMixin(object): def _create_user(self, **kwargs): diff --git a/runtests.py b/runtests.py index 15d10479..fc0f3a83 100755 --- a/runtests.py +++ b/runtests.py @@ -13,4 +13,5 @@ args.insert(1, "test") args.insert(2, "openwisp_users") args.insert(3, "testapp.tests") + args.insert(4, "testapp.test_multitenancy") execute_from_command_line(args) diff --git a/tests/testapp/__init__.py b/tests/testapp/__init__.py index e69de29b..87258ebb 100644 --- a/tests/testapp/__init__.py +++ b/tests/testapp/__init__.py @@ -0,0 +1,17 @@ +class CreateMixin(object): + def _create_book(self, **kwargs): + options = dict(name='test-book', + author='test-author') + options.update(kwargs) + b = self.book_model(**options) + b.full_clean() + b.save() + return b + + def _create_shelf(self, **kwargs): + options = dict(name='test-shelf') + options.update(kwargs) + s = self.shelf_model(**options) + s.full_clean() + s.save() + return s diff --git a/tests/testapp/admin.py b/tests/testapp/admin.py new file mode 100644 index 00000000..c4f9f6af --- /dev/null +++ b/tests/testapp/admin.py @@ -0,0 +1,48 @@ +from django.contrib import admin +from openwisp_users.multitenancy import (MultitenantAdminMixin, + MultitenantOrgFilter, + MultitenantRelatedOrgFilter) + +from .models import Book, Shelf + + +class BaseAdmin(MultitenantAdminMixin, admin.ModelAdmin): + pass + + +class ShelfAdmin(BaseAdmin): + list_display = ['name', 'organization'] + list_filter = [('organization', MultitenantOrgFilter)] + fields = ['name', 'organization', 'created', 'modified'] + + +class BookAdmin(BaseAdmin): + list_display = ['name', 'author', 'organization', 'shelf'] + list_filter = [('organization', MultitenantOrgFilter), + ('shelf', MultitenantRelatedOrgFilter)] + fields = ['name', 'author', 'organization', 'shelf', 'created', 'modified'] + multitenant_shared_relations = ['shelf'] + + def change_view(self, request, object_id, form_url='', extra_context=None): + extra_context = extra_context or {} + extra_context.update({ + 'additional_buttons': [ + { + 'type': 'button', + 'url': 'DUMMY', + 'class': 'previewbook', + 'value': 'Preview book', + }, + { + 'type': 'button', + 'url': 'DUMMY', + 'class': 'downloadbook', + 'value': 'Download book', + } + ] + }) + return super(BookAdmin, self).change_view(request, object_id, form_url, extra_context) + + +admin.site.register(Shelf, ShelfAdmin) +admin.site.register(Book, BookAdmin) diff --git a/tests/testapp/apps.py b/tests/testapp/apps.py new file mode 100644 index 00000000..501ed6ac --- /dev/null +++ b/tests/testapp/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class TestAppConfig(AppConfig): + name = 'testapp' + label = 'testapp' diff --git a/tests/testapp/migrations/0003_multitenancy.py b/tests/testapp/migrations/0003_multitenancy.py new file mode 100644 index 00000000..885f31e3 --- /dev/null +++ b/tests/testapp/migrations/0003_multitenancy.py @@ -0,0 +1,53 @@ +# Generated by Django 2.1.3 on 2018-11-29 21:44 + +from django.db import migrations, models +import django.db.models.deletion +import django.utils.timezone +import model_utils.fields +import openwisp_users.mixins +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('openwisp_users', '0004_default_groups'), + ('testapp', '0002_config_template'), + ] + + operations = [ + migrations.CreateModel( + name='Book', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')), + ('name', models.CharField(max_length=64, verbose_name='name')), + ('author', models.CharField(max_length=64, verbose_name='author')), + ('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='openwisp_users.Organization', verbose_name='organization')), + ], + options={ + 'abstract': False, + }, + bases=(openwisp_users.mixins.ValidateOrgMixin, models.Model), + ), + migrations.CreateModel( + name='Shelf', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')), + ('name', models.CharField(max_length=64, verbose_name='name')), + ('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='openwisp_users.Organization', verbose_name='organization')), + ], + options={ + 'abstract': False, + }, + bases=(openwisp_users.mixins.ValidateOrgMixin, models.Model), + ), + migrations.AddField( + model_name='book', + name='shelf', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='testapp.Shelf'), + ), + ] diff --git a/tests/testapp/models.py b/tests/testapp/models.py index fa28fb6e..fa3503ec 100644 --- a/tests/testapp/models.py +++ b/tests/testapp/models.py @@ -1,5 +1,8 @@ +from django.core.exceptions import ValidationError from django.db import models +from django.utils.translation import ugettext_lazy as _ from openwisp_users.mixins import OrgMixin, ShareableOrgMixin +from openwisp_utils.base import TimeStampedEditableModel class Template(ShareableOrgMixin): @@ -13,3 +16,30 @@ class Config(OrgMixin): def clean(self): self._validate_org_relation('template') + + +class Shelf(OrgMixin, TimeStampedEditableModel): + name = models.CharField(_('name'), max_length=64) + + def __str__(self): + return self.name + + class Meta: + abstract = False + + def clean(self): + if self.name == "Intentional_Test_Fail": + raise ValidationError('Intentional_Test_Fail') + return self + + +class Book(OrgMixin, TimeStampedEditableModel): + name = models.CharField(_('name'), max_length=64) + author = models.CharField(_('author'), max_length=64) + shelf = models.ForeignKey('testapp.Shelf', on_delete=models.CASCADE) + + def __str__(self): + return self.name + + class Meta: + abstract = False diff --git a/tests/testapp/test_multitenancy.py b/tests/testapp/test_multitenancy.py new file mode 100644 index 00000000..2f75b1e4 --- /dev/null +++ b/tests/testapp/test_multitenancy.py @@ -0,0 +1,89 @@ +from django.test import TestCase +from django.urls import reverse +from openwisp_users.tests.utils import (TestMultitenantAdminMixin, + TestOrganizationMixin) + +from . import CreateMixin +from .models import Book, Shelf + + +class TestMultitenancy(CreateMixin, TestMultitenantAdminMixin, + TestOrganizationMixin, TestCase): + book_model = Book + shelf_model = Shelf + operator_permission_filter = [ + {'codename__endswith': 'book'}, + {'codename__endswith': 'shelf'}, + ] + + def _create_multitenancy_test_env(self): + org1 = self._create_org(name='org1') + org2 = self._create_org(name='org2') + inactive = self._create_org(name='inactive-org', is_active=False) + operator = self._create_operator(organizations=[org1, inactive]) + s1 = self._create_shelf(name='shell1', organization=org1) + s2 = self._create_shelf(name='shell2', organization=org2) + s3 = self._create_shelf(name='shell3', organization=inactive) + b1 = self._create_book(name='book1', organization=org1, shelf=s1) + b2 = self._create_book(name='book2', organization=org2, shelf=s2) + b3 = self._create_book(name='book3', organization=inactive, shelf=s3) + data = dict(s1=s1, s2=s2, s3_inactive=s3, + b1=b1, b2=b2, b3_inactive=b3, + org1=org1, org2=org2, + inactive=inactive, + operator=operator) + return data + + def test_shelf_queryset(self): + data = self._create_multitenancy_test_env() + self._test_multitenant_admin( + url=reverse('admin:testapp_shelf_changelist'), + visible=[data['s1'].name, data['org1'].name], + hidden=[data['s2'].name, data['org2'].name, + data['s3_inactive'].name] + ) + + def test_shelf_organization_fk_queryset(self): + data = self._create_multitenancy_test_env() + self._test_multitenant_admin( + url=reverse('admin:testapp_shelf_add'), + visible=[data['org1'].name], + hidden=[data['org2'].name, data['inactive']], + select_widget=True + ) + + def test_book_queryset(self): + data = self._create_multitenancy_test_env() + self._test_multitenant_admin( + url=reverse('admin:testapp_book_changelist'), + visible=[data['b1'].name, data['org1'].name], + hidden=[data['b2'].name, data['org2'].name, + data['b3_inactive'].name] + ) + + def test_book_organization_fk_queryset(self): + data = self._create_multitenancy_test_env() + self._test_multitenant_admin( + url=reverse('admin:testapp_book_add'), + visible=[data['org1'].name], + hidden=[data['org2'].name, data['inactive']], + select_widget=True + ) + + def test_book_shelf_filter(self): + data = self._create_multitenancy_test_env() + s_special = self._create_shelf(name='special', organization=data['org1']) + self._test_multitenant_admin( + url=reverse('admin:testapp_book_changelist'), + visible=[data['s1'].name, s_special.name], + hidden=[data['s2'].name, data['s3_inactive'].name] + ) + + def test_book_shelf_fk_queryset(self): + data = self._create_multitenancy_test_env() + self._test_multitenant_admin( + url=reverse('admin:testapp_book_add'), + visible=[data['s1'].name], + hidden=[data['s2'].name, data['s3_inactive'].name], + select_widget=True + )