diff --git a/kolibri/core/api.py b/kolibri/core/api.py index 4ab72b97e86..5c02843ecf8 100644 --- a/kolibri/core/api.py +++ b/kolibri/core/api.py @@ -6,7 +6,6 @@ from rest_framework import viewsets from rest_framework.decorators import action from rest_framework.filters import OrderingFilter -from rest_framework.generics import get_object_or_404 from rest_framework.mixins import CreateModelMixin as BaseCreateModelMixin from rest_framework.mixins import DestroyModelMixin from rest_framework.mixins import UpdateModelMixin as BaseUpdateModelMixin @@ -214,27 +213,6 @@ def _get_lookup_filter(self): return {self.lookup_field: self.kwargs[lookup_url_kwarg]} - def _get_object_from_queryset(self, queryset): - """ - Returns the object the view is displaying. - We override this to remove the DRF default behaviour - of filtering the queryset. - (rtibbles) There doesn't seem to be a use case for - querying a detail endpoint and also filtering by query - parameters that might result in a 404. - """ - # Perform the lookup filtering. - filter_kwargs = self._get_lookup_filter() - obj = get_object_or_404(queryset, **filter_kwargs) - - # May raise a permission denied - self.check_object_permissions(self.request, obj) - - return obj - - def get_object(self): - return self._get_object_from_queryset(self.get_queryset()) - def annotate_queryset(self, queryset): return queryset @@ -259,10 +237,10 @@ def serialize(self, queryset): ) def serialize_object(self, **filter_kwargs): - queryset = self.get_queryset() try: filter_kwargs = filter_kwargs or self._get_lookup_filter() - return self.serialize(queryset.filter(**filter_kwargs))[0] + queryset = self.get_queryset().filter(**filter_kwargs) + return self.serialize(self.filter_queryset(queryset))[0] except IndexError: raise Http404( "No %s matches the given query." % queryset.model._meta.object_name diff --git a/kolibri/core/auth/api.py b/kolibri/core/auth/api.py index 53781291c03..c2f316a91bf 100644 --- a/kolibri/core/auth/api.py +++ b/kolibri/core/auth/api.py @@ -99,10 +99,8 @@ class KolibriAuthPermissionsFilter(filters.BaseFilterBackend): """ def filter_queryset(self, request, queryset, view): - if request.method == "GET" and request.resolver_match.url_name.endswith( - "-list" - ): - # only filter down the queryset in the case of the list view being requested + if request.method == "GET": + # If a 'GET' method only return readable items to filter down the queryset. return request.user.filter_readable(queryset) # otherwise, return the full queryset, as permission checks will happen object-by-object # (and filtering here then leads to 404's instead of the more correct 403's) diff --git a/kolibri/core/auth/csv_utils.py b/kolibri/core/auth/csv_utils.py index 32679b6f0c4..da03d386ed0 100644 --- a/kolibri/core/auth/csv_utils.py +++ b/kolibri/core/auth/csv_utils.py @@ -17,6 +17,7 @@ from kolibri.core.auth.models import FacilityUser from kolibri.core.query import SQCount from kolibri.core.utils.csv import open_csv_for_writing +from kolibri.core.utils.csv import output_mapper logger = logging.getLogger(__name__) @@ -96,14 +97,7 @@ def replace_multiple_classrooms(field, obj): ) -def map_output(obj): - mapped_obj = {} - for header, label in labels.items(): - if header in output_mappings and header in obj: - mapped_obj[label] = output_mappings[header](obj) - elif header in obj: - mapped_obj[label] = obj[header] - return mapped_obj +map_output = partial(output_mapper, labels=labels, output_mappings=output_mappings) input_fields = ( diff --git a/kolibri/core/auth/management/commands/bulkexportusers.py b/kolibri/core/auth/management/commands/bulkexportusers.py index fa46da78c71..be83f4703ed 100644 --- a/kolibri/core/auth/management/commands/bulkexportusers.py +++ b/kolibri/core/auth/management/commands/bulkexportusers.py @@ -27,6 +27,7 @@ from kolibri.core.tasks.management.commands.base import AsyncCommand from kolibri.core.tasks.utils import get_current_job from kolibri.core.utils.csv import open_csv_for_writing +from kolibri.core.utils.csv import output_mapper from kolibri.utils import conf try: @@ -107,14 +108,7 @@ def kind_of_roles(field, obj): } -def map_output(obj): - mapped_obj = {} - for header, label in labels.items(): - if header in output_mappings and header in obj: - mapped_obj[label] = output_mappings[header](obj) - elif header in obj: - mapped_obj[label] = obj[header] - return mapped_obj +map_output = partial(output_mapper, labels=labels, output_mappings=output_mappings) def translate_labels(): diff --git a/kolibri/core/auth/models.py b/kolibri/core/auth/models.py index 2b7bc19c3eb..69afd2df2ab 100644 --- a/kolibri/core/auth/models.py +++ b/kolibri/core/auth/models.py @@ -55,16 +55,14 @@ from .errors import UserIsNotFacilityUser from .errors import UserIsNotMemberError from .permissions.auth import AllCanReadFacilityDataset -from .permissions.auth import AnonUserCanReadFacilities +from .permissions.auth import AnyUserCanReadFacilities from .permissions.auth import CoachesCanManageGroupsForTheirClasses from .permissions.auth import CoachesCanManageMembershipsForTheirGroups from .permissions.auth import CollectionSpecificRoleBasedPermissions from .permissions.auth import FacilityAdminCanEditForOwnFacilityDataset -from .permissions.auth import MembersCanReadMembershipsOfTheirCollections from .permissions.base import BasePermissions from .permissions.base import RoleBasedPermissions from .permissions.general import IsAdminForOwnFacility -from .permissions.general import IsFromSameFacility from .permissions.general import IsOwn from .permissions.general import IsSelf from kolibri.core.auth.constants.demographics import choices as GENDER_CHOICES @@ -932,9 +930,8 @@ class Collection(AbstractFacilityDataModel): # Furthermore, no FacilityUser can create or delete a Facility. Permission to create a collection is governed # by roles in relation to the new collection's parent collection (see CollectionSpecificRoleBasedPermissions). permissions = ( - IsFromSameFacility(read_only=True) - | CollectionSpecificRoleBasedPermissions() - | AnonUserCanReadFacilities() + CollectionSpecificRoleBasedPermissions() + | AnyUserCanReadFacilities() | CoachesCanManageGroupsForTheirClasses() ) @@ -1130,9 +1127,7 @@ class Membership(AbstractFacilityDataModel): ) # Membership can be written by coaches under the coaches' group membership = CoachesCanManageMembershipsForTheirGroups() - # Members can read memberships of collections they are members of - own_collections = MembersCanReadMembershipsOfTheirCollections() - permissions = own | role | membership | own_collections + permissions = own | role | membership user = models.ForeignKey( "FacilityUser", related_name="memberships", blank=False, null=False diff --git a/kolibri/core/auth/permissions/auth.py b/kolibri/core/auth/permissions/auth.py index c3cdad3e68e..9068fa18500 100644 --- a/kolibri/core/auth/permissions/auth.py +++ b/kolibri/core/auth/permissions/auth.py @@ -1,7 +1,6 @@ """ The permissions classes in this module define the specific permissions that govern access to the models in the auth app. """ -from django.contrib.auth.models import AnonymousUser from django.db.models import Q from ..constants.collection_kinds import ADHOCLEARNERSGROUP @@ -28,6 +27,7 @@ def __init__(self): can_be_read_by=(ADMIN, COACH), can_be_updated_by=(ADMIN,), can_be_deleted_by=None, + collection_field=".", ) def user_can_create_object(self, user, obj): @@ -55,28 +55,19 @@ def user_can_delete_object(self, user, obj): CollectionSpecificRoleBasedPermissions, self ).user_can_update_object(user, obj.parent) - def readable_by_user_filter(self, user): - if user.is_anonymous(): - return q_none - - # By default can read all collections in facility - return Q(dataset_id=user.dataset_id) - -class AnonUserCanReadFacilities(DenyAll): +class AnyUserCanReadFacilities(DenyAll): """ - Permissions class that allows reading the object if user is anonymous. + Permissions class that allows reading the object for any user. """ def user_can_read_object(self, user, obj): if obj.kind == FACILITY: - return isinstance(user, AnonymousUser) + return True return False def readable_by_user_filter(self, user): - if isinstance(user, AnonymousUser): - return Q(kind=FACILITY) - return q_none + return Q(kind=FACILITY) class FacilityAdminCanEditForOwnFacilityDataset(BasePermissions): @@ -196,24 +187,3 @@ def user_can_delete_object(self, user, obj): def readable_by_user_filter(self, user): return q_none - - -class MembersCanReadMembershipsOfTheirCollections(BasePermissions): - def user_can_create_object(self, user, obj): - return False - - def user_can_read_object(self, user, obj): - return user.is_member_of(obj.collection) - - def user_can_update_object(self, user, obj): - return False - - def user_can_delete_object(self, user, obj): - return False - - def readable_by_user_filter(self, user): - if user.is_anonymous(): - return q_none - # Add a special case where users with memberships in the same collection - # can also read memberships for other members - return Q(collection_id__in=user.memberships.all().values("collection_id")) diff --git a/kolibri/core/auth/permissions/base.py b/kolibri/core/auth/permissions/base.py index 38a1a8ff4f6..3d7c7908574 100644 --- a/kolibri/core/auth/permissions/base.py +++ b/kolibri/core/auth/permissions/base.py @@ -100,6 +100,7 @@ def __init__( :param tuple can_be_updated_by: a tuple of role kinds that should give a user permission to update the object :param tuple can_be_deleted_by: a tuple of role kinds that should give a user permission to delete the object :param str collection_field: the name of the field through which collections can be identified for the object. + (or "." if the object itself is the collection) :param is_syncable: Boolean indicating whether the model is a syncable model, if it is, we can use dataset_id to do quick filtering in some cases, if not, then we need to use the target_field as the source of the dataset_id. """ @@ -108,7 +109,17 @@ def __init__( self.can_be_updated_by = can_be_updated_by self.can_be_deleted_by = can_be_deleted_by self.target_field = target_field - self.collection_field = collection_field + if collection_field == ".": + # Process special keyword '.' to allow this class to be used with Collection objects, particularly the + # readably by user filter, where the collection is the object itself, so we point at the id, + # and for parent collections to its parent. + self.collection_field = "id" + self.parent_collection_field = "parent" + else: + # Otherwise, we just set the collection field to the value passed in (which defaults to "collection") + # and set the parent field as "__parent" to point to the parent of the FKed collection. + self.collection_field = collection_field + self.parent_collection_field = "{}__parent".format(self.collection_field) self.is_syncable = is_syncable def _get_target_object(self, obj): @@ -207,7 +218,7 @@ def readable_by_user_filter(self, user): # which collection an object is associated with. q_filter = Q( Q(**{"{}__in".format(self.collection_field): collection_ids}) - | Q(**{"{}__parent__in".format(self.collection_field): collection_ids}) + | Q(**{"{}__in".format(self.parent_collection_field): collection_ids}) ) # Also filter by the parents of collections, so that objects associated with LearnerGroup # or AdHocGroups will also be readable by those with coach permissions on the parent Classroom diff --git a/kolibri/core/auth/permissions/general.py b/kolibri/core/auth/permissions/general.py index 6a20eebf494..addcd2a39c4 100644 --- a/kolibri/core/auth/permissions/general.py +++ b/kolibri/core/auth/permissions/general.py @@ -114,35 +114,6 @@ def readable_by_user_filter(self, user): return Q(**{self.field_name: user.id}) -class IsFromSameFacility(BasePermissions): - """ - Permissions class that only allows access to object if user is associated with the same facility as the object. - """ - - def __init__(self, field_name=".", read_only=False): - self.read_only = read_only - - def _facility_dataset_is_same(self, user, obj): - return hasattr(user, "dataset") and user.dataset == obj.dataset - - def user_can_create_object(self, user, obj): - return (not self.read_only) and self._facility_dataset_is_same(user, obj) - - def user_can_read_object(self, user, obj): - return self._facility_dataset_is_same(user, obj) - - def user_can_update_object(self, user, obj): - return (not self.read_only) and self._facility_dataset_is_same(user, obj) - - def user_can_delete_object(self, user, obj): - return (not self.read_only) and self._facility_dataset_is_same(user, obj) - - def readable_by_user_filter(self, user): - if hasattr(user, "dataset"): - return Q(dataset=user.dataset) - return q_none - - def _user_is_admin_for_own_facility(user, obj=None): # import here to avoid circular imports diff --git a/kolibri/core/auth/test/test_api.py b/kolibri/core/auth/test/test_api.py index e537273b091..08e4c240a99 100644 --- a/kolibri/core/auth/test/test_api.py +++ b/kolibri/core/auth/test/test_api.py @@ -81,8 +81,9 @@ def setUpTestData(cls): cls.learner_groups += [ LearnerGroupFactory.create(parent=classroom) for _ in range(5) ] + cls.user = FacilityUserFactory.create(facility=cls.facility) - def setUp(self): + def login_superuser(self): self.client.login( username=self.superuser.username, password=DUMMY_PASSWORD, @@ -90,6 +91,7 @@ def setUp(self): ) def test_learnergroup_list(self): + self.login_superuser() response = self.client.get( reverse("kolibri:core:learnergroup-list"), format="json" ) @@ -110,7 +112,35 @@ def test_learnergroup_list(self): self.assertItemsEqual(group.pop("user_ids"), expected[i].pop("user_ids")) self.assertItemsEqual(response.data, expected) + def test_learnergroup_list_user(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse("kolibri:core:learnergroup-list"), format="json" + ) + expected = [] + self.assertItemsEqual(response.data, expected) + + def test_learnergroup_list_user_parent_filter(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse("kolibri:core:learnergroup-list") + + "?parent=" + + self.classrooms[0].id, + format="json", + ) + expected = [] + self.assertItemsEqual(response.data, expected) + def test_learnergroup_detail(self): + self.login_superuser() response = self.client.get( reverse( "kolibri:core:learnergroup-detail", @@ -126,7 +156,23 @@ def test_learnergroup_detail(self): } self.assertItemsEqual(response.data, expected) + def test_learnergroup_detail_user(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse( + "kolibri:core:learnergroup-detail", + kwargs={"pk": self.learner_groups[0].id}, + ), + format="json", + ) + self.assertEqual(response.status_code, 404) + def test_parent_in_queryparam_with_one_id(self): + self.login_superuser() classroom_id = self.classrooms[0].id response = self.client.get( reverse("kolibri:core:learnergroup-list"), @@ -152,6 +198,7 @@ def test_parent_in_queryparam_with_one_id(self): self.assertItemsEqual(response.data, expected) def test_cannot_create_learnergroup_same_name(self): + self.login_superuser() classroom_id = self.classrooms[0].id learner_group_name = ( models.LearnerGroup.objects.filter(parent_id=classroom_id).first().name @@ -165,6 +212,7 @@ def test_cannot_create_learnergroup_same_name(self): self.assertEqual(response.data[0]["id"], error_constants.UNIQUE) def test_cannot_create_learnergroup_no_classroom_parent(self): + self.login_superuser() classroom_id = self.classrooms[0].id learner_group_id = ( models.LearnerGroup.objects.filter(parent_id=classroom_id).first().id @@ -187,8 +235,9 @@ def setUpTestData(cls): ClassroomFactory.create(parent=cls.facility) for _ in range(10) ] cls.learner_group = LearnerGroupFactory.create(parent=cls.classrooms[0]) + cls.user = FacilityUserFactory.create(facility=cls.facility) - def setUp(self): + def login_superuser(self): self.client.login( username=self.superuser.username, password=DUMMY_PASSWORD, @@ -196,6 +245,7 @@ def setUp(self): ) def test_classroom_list(self): + self.login_superuser() response = self.client.get( reverse("kolibri:core:classroom-list"), format="json" ) @@ -213,7 +263,31 @@ def test_classroom_list(self): ] self.assertItemsEqual(response.data, expected) + def test_classroom_list_user(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse("kolibri:core:classroom-list"), format="json" + ) + self.assertItemsEqual(response.data, []) + + def test_classroom_list_user_parent_filter(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse("kolibri:core:classroom-list") + "?parent=" + self.facility.id, + format="json", + ) + self.assertItemsEqual(response.data, []) + def test_classroom_detail(self): + self.login_superuser() response = self.client.get( reverse( "kolibri:core:classroom-detail", kwargs={"pk": self.classrooms[0].id} @@ -229,7 +303,22 @@ def test_classroom_detail(self): } self.assertDictEqual(response.data, expected) + def test_classroom_detail_user(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse( + "kolibri:core:classroom-detail", kwargs={"pk": self.classrooms[0].id} + ), + format="json", + ) + self.assertEqual(response.status_code, 404) + def test_classroom_detail_assigned_coach_super_user(self): + self.login_superuser() self.classrooms[0].add_coach(self.superuser) response = self.client.get( reverse( @@ -264,6 +353,7 @@ def test_classroom_detail_assigned_coach_super_user(self): self.assertDictEqual(response.data, expected) def test_classroom_detail_assigned_coach_admin(self): + self.login_superuser() admin = FacilityUserFactory.create(facility=self.facility) self.facility.add_admin(admin) self.classrooms[0].add_coach(admin) @@ -298,6 +388,7 @@ def test_classroom_detail_assigned_coach_admin(self): self.assertDictEqual(response.data, expected) def test_classroom_facility_coach_role_for_filter(self): + self.login_superuser() coach = FacilityUserFactory.create(facility=self.facility) self.facility.add_coach(coach) response = self.client.get( @@ -309,6 +400,7 @@ def test_classroom_facility_coach_role_for_filter(self): self.assertEqual(len(response.data), len(self.classrooms)) def test_cannot_create_classroom_same_name(self): + self.login_superuser() classroom_name = self.classrooms[0].name response = self.client.post( reverse("kolibri:core:classroom-list"), @@ -319,6 +411,7 @@ def test_cannot_create_classroom_same_name(self): self.assertEqual(response.data[0]["id"], error_constants.UNIQUE) def test_cannot_create_classroom_no_facility_parent(self): + self.login_superuser() classroom_id = self.classrooms[0].id response = self.client.post( reverse("kolibri:core:classroom-list"), @@ -710,14 +803,12 @@ def setUpTestData(cls): cls.facility.add_admin(cls.superuser) cls.user = FacilityUserFactory.create(facility=cls.facility) - def setUp(self): + def test_user_list(self): self.client.login( username=self.superuser.username, password=DUMMY_PASSWORD, facility=self.facility, ) - - def test_user_list(self): response = self.client.get(reverse("kolibri:core:facilityuser-list")) self.assertEqual(response.status_code, 200) self.assertItemsEqual( @@ -754,6 +845,66 @@ def test_user_list(self): ], ) + def test_user_list_self(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get(reverse("kolibri:core:facilityuser-list")) + self.assertEqual(response.status_code, 200) + self.assertItemsEqual( + response.data, + [ + { + "id": self.user.id, + "username": self.user.username, + "full_name": self.user.full_name, + "facility": self.user.facility_id, + "id_number": self.user.id_number, + "gender": self.user.gender, + "birth_year": self.user.birth_year, + "is_superuser": False, + "roles": [], + }, + ], + ) + + def test_anonymous_user_list(self): + response = self.client.get(reverse("kolibri:core:facilityuser-list")) + self.assertEqual(response.status_code, 200) + self.assertItemsEqual( + response.data, + [], + ) + + def test_user_no_retrieve_admin(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse( + "kolibri:core:facilityuser-detail", kwargs={"pk": self.superuser.id} + ) + ) + self.assertEqual(response.status_code, 404) + + def test_anonymous_no_retrieve_admin(self): + response = self.client.get( + reverse( + "kolibri:core:facilityuser-detail", kwargs={"pk": self.superuser.id} + ) + ) + self.assertEqual(response.status_code, 404) + + def test_anonymous_no_retrieve_user(self): + response = self.client.get( + reverse("kolibri:core:facilityuser-detail", kwargs={"pk": self.user.id}) + ) + self.assertEqual(response.status_code, 404) + class FacilityUserFilterTestCase(APITestCase): @classmethod @@ -1175,7 +1326,7 @@ def test_for_incompatible_settings_only_one(self): self.assertEqual(response.status_code, 400) -class MembershipCascadeDeletion(APITestCase): +class MembershipAPITestCase(APITestCase): @classmethod def setUpTestData(cls): provision_device() @@ -1193,14 +1344,83 @@ def setUpTestData(cls): models.Membership.objects.create(collection=cls.classroom, user=cls.other_user) models.Membership.objects.create(collection=cls.lg, user=cls.other_user) - def setUp(self): + def login_superuser(self): self.client.login( username=self.superuser.username, password=DUMMY_PASSWORD, facility=self.facility, ) + def test_user_list_own(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get(reverse("kolibri:core:membership-list")) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data), 2) + for membership in response.data: + self.assertEqual(membership["user"], self.user.id) + + def test_other_user_list_own(self): + self.client.login( + username=self.other_user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get(reverse("kolibri:core:membership-list")) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data), 2) + for membership in response.data: + self.assertEqual(membership["user"], self.other_user.id) + + def test_superuser_list_all(self): + self.login_superuser() + response = self.client.get(reverse("kolibri:core:membership-list")) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data), 4) + + def test_user_retrieve_own(self): + self.client.login( + username=self.user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse( + "kolibri:core:membership-detail", + kwargs={"pk": self.classroom_membership.id}, + ) + ) + self.assertEqual(response.status_code, 200) + + def test_user_retrieve_other(self): + self.client.login( + username=self.other_user.username, + password=DUMMY_PASSWORD, + facility=self.facility, + ) + response = self.client.get( + reverse( + "kolibri:core:membership-detail", + kwargs={"pk": self.classroom_membership.id}, + ) + ) + self.assertEqual(response.status_code, 404) + + def test_superuser_retrieve_other(self): + self.login_superuser() + response = self.client.get( + reverse( + "kolibri:core:membership-detail", + kwargs={"pk": self.classroom_membership.id}, + ) + ) + self.assertEqual(response.status_code, 200) + def test_delete_classroom_membership(self): + self.login_superuser() url = reverse("kolibri:core:membership-list") + "?user={}&collection={}".format( self.user.id, self.classroom.id ) @@ -1209,6 +1429,7 @@ def test_delete_classroom_membership(self): self.assertFalse(models.Membership.objects.filter(user=self.user).exists()) def test_delete_detail(self): + self.login_superuser() response = self.client.delete( reverse( "kolibri:core:membership-detail", @@ -1219,6 +1440,7 @@ def test_delete_detail(self): self.assertFalse(models.Membership.objects.filter(user=self.user).exists()) def test_delete_does_not_affect_other_user_memberships(self): + self.login_superuser() expected_count = models.Membership.objects.filter(user=self.other_user).count() self.client.delete( reverse( diff --git a/kolibri/core/auth/test/test_permissions.py b/kolibri/core/auth/test/test_permissions.py index 58866d04a83..1e8a52631cd 100644 --- a/kolibri/core/auth/test/test_permissions.py +++ b/kolibri/core/auth/test/test_permissions.py @@ -207,7 +207,7 @@ def test_facility_users_can_read_own_facility(self): self.assertTrue(user.can_read(own_facility)) self.assertIn(own_facility, user.filter_readable(Facility.objects.all())) - def test_facility_users_cannot_read_other_facility(self): + def test_facility_users_can_read_other_facility(self): """ FacilityUsers cannot read other Facilities, regardless of their roles """ other_facility = self.data2["facility"] for user in [ @@ -216,10 +216,8 @@ def test_facility_users_cannot_read_other_facility(self): self.data1["learners_one_group"][0][0], self.data1["unattached_users"][0], ]: - self.assertFalse(user.can_read(other_facility)) - self.assertNotIn( - other_facility, user.filter_readable(Facility.objects.all()) - ) + self.assertTrue(user.can_read(other_facility)) + self.assertIn(other_facility, user.filter_readable(Facility.objects.all())) def test_anon_users_can_read_facility(self): """ KolibriAnonymousUser can now read Facility objects """ @@ -319,31 +317,46 @@ def test_only_facility_admin_can_create_classroom(self): self.assertFalse(self.anon_user.can_create(Classroom, new_classroom_data)) def test_members_can_read_own_classroom(self): - """ Members of a Classroom can read that Classroom, as can coaches and admins for the Classroom """ + """ Coaches and admins for the Classroom can read the classroom """ for user in [ self.data["facility_admin"], self.data["facility_coach"], self.own_classroom_coach, - self.member, ]: self.assertTrue(user.can_read(self.own_classroom)) self.assertIn( self.own_classroom, user.filter_readable(Classroom.objects.all()) ) - def test_members_and_admins_and_coaches_can_read_other_classroom(self): + def test_members_cannot_read_own_classroom(self): + """ Members of a Classroom cannot read that Classroom""" + self.assertFalse(self.member.can_read(self.own_classroom)) + self.assertNotIn( + self.own_classroom, self.member.filter_readable(Classroom.objects.all()) + ) + + def test_admins_and_facility_coaches_can_read_other_classroom(self): """ Members and admins/coaches for a Classroom can read another Classroom """ for user in [ self.data["facility_admin"], self.data["facility_coach"], - self.own_classroom_coach, - self.member, ]: self.assertTrue(user.can_read(self.other_classroom)) self.assertIn( self.other_classroom, user.filter_readable(Classroom.objects.all()) ) + def test_members_and_class_coaches_cannot_read_other_classroom(self): + """ Members and admins/coaches for a Classroom can read another Classroom """ + for user in [ + self.own_classroom_coach, + self.member, + ]: + self.assertFalse(user.can_read(self.other_classroom)) + self.assertNotIn( + self.other_classroom, user.filter_readable(Classroom.objects.all()) + ) + def test_only_admins_can_update_own_classroom(self): """ The only FacilityUsers who can update a Classroom are admins for that Classroom (or for the Facility) """ self.assertTrue(self.data["facility_admin"].can_update(self.own_classroom)) @@ -429,18 +442,25 @@ def test_facility_admins_or_coaches_or_classroom_coach_can_create_learnergroup( self.assertFalse(self.anon_user.can_create(LearnerGroup, new_learnergroup_data)) def test_members_can_read_own_learnergroup(self): - """ Members of a LearnerGroup can read that LearnerGroup, as can coaches and admins for the LearnerGroup """ + """ Coaches and admins for the LearnerGroup can read the learner group""" for user in [ self.data["facility_admin"], self.data["facility_coach"], self.own_classroom_coach, - self.member, ]: self.assertTrue(user.can_read(self.own_learnergroup)) self.assertIn( self.own_learnergroup, user.filter_readable(LearnerGroup.objects.all()) ) + def test_members_cannot_read_own_learnergroup(self): + """ Members of a LearnerGroup cannot read that LearnerGroup""" + self.assertFalse(self.member.can_read(self.own_learnergroup)) + self.assertNotIn( + self.own_learnergroup, + self.member.filter_readable(LearnerGroup.objects.all()), + ) + def test_admins_or_coach_can_update_own_learnergroup(self): """ The only FacilityUsers who can update a LearnerGroup are admins for that LearnerGroup """ self.assertTrue(self.data["facility_admin"].can_update(self.own_learnergroup)) diff --git a/kolibri/core/auth/test/test_user_export.py b/kolibri/core/auth/test/test_user_export.py index 1b355ccb8ea..a4205932578 100644 --- a/kolibri/core/auth/test/test_user_export.py +++ b/kolibri/core/auth/test/test_user_export.py @@ -43,6 +43,12 @@ "gender": DEFERRED, "password": "password", }, + { + "username": "=2+5+cmd|' /C calc'!A0", + "birth_year": "2000", + "gender": DEFERRED, + "password": "password", + }, ] @@ -73,6 +79,8 @@ def test_csv_export_with_demographics(self): self.assertEqual(row[labels["facility__id"]], facility.id) self.assertEqual(row[labels["memberships__collection__name"]], "") self.assertEqual(row[labels["memberships__collection__id"]], "") + elif "2+5+cmd" in row[labels["username"]]: + self.assertEqual(row[labels["username"]], "'=2+5+cmd\\|' /C calc'!A0") self.assertEqual(len(results), expected_count) for demo_field in DEMO_FIELDS: diff --git a/kolibri/core/logger/csv_export.py b/kolibri/core/logger/csv_export.py index 1986d05d10d..691708be921 100644 --- a/kolibri/core/logger/csv_export.py +++ b/kolibri/core/logger/csv_export.py @@ -7,6 +7,7 @@ import math import os from collections import OrderedDict +from functools import partial from django.core.cache import cache from django.http import Http404 @@ -24,6 +25,7 @@ from kolibri.core.content.models import ChannelMetadata from kolibri.core.content.models import ContentNode from kolibri.core.utils.csv import open_csv_for_writing +from kolibri.core.utils.csv import output_mapper from kolibri.utils import conf @@ -86,15 +88,7 @@ def cache_content_title(obj): ) ) - -def map_object(obj): - mapped_obj = {} - for header, label in labels.items(): - if header in mappings: - mapped_obj[label] = mappings[header](obj) - elif header in obj: - mapped_obj[label] = obj[header] - return mapped_obj +map_object = partial(output_mapper, labels=labels, output_mappings=mappings) classes_info = { diff --git a/kolibri/core/utils/csv.py b/kolibri/core/utils/csv.py index c71b9c696af..99a48cceb12 100644 --- a/kolibri/core/utils/csv.py +++ b/kolibri/core/utils/csv.py @@ -1,5 +1,7 @@ import io +import re import sys +from numbers import Number def open_csv_for_writing(filepath): @@ -12,3 +14,34 @@ def open_csv_for_reading(filepath): if sys.version_info[0] < 3: return io.open(filepath, "rb") return io.open(filepath, "r", newline="", encoding="utf-8-sig") + + +negative_number_regex = re.compile("^-?[0-9,\\.]+$") +csv_injection_chars = {"@", "+", "-", "=", "|", "%"} + + +def sanitize(value): + if value is None or isinstance(value, Number): + return value + + value = str(value) + if ( + value + and value[0] in csv_injection_chars + and not negative_number_regex.match(value) + ): + value = value.replace("|", "\\|") + value = "'" + value + return value + + +def output_mapper(obj, labels=None, output_mappings=None): + mapped_obj = {} + labels = labels or {} + output_mappings = output_mappings or {} + for header, label in labels.items(): + if header in output_mappings and header in obj: + mapped_obj[label] = sanitize(output_mappings[header](obj)) + elif header in obj: + mapped_obj[label] = sanitize(obj[header]) + return mapped_obj diff --git a/kolibri/plugins/learn/viewsets.py b/kolibri/plugins/learn/viewsets.py index a5726d8d3ed..1aa5fa7fcd1 100644 --- a/kolibri/plugins/learn/viewsets.py +++ b/kolibri/plugins/learn/viewsets.py @@ -9,7 +9,6 @@ from rest_framework.views import APIView from kolibri.core.api import ReadOnlyValuesViewset -from kolibri.core.auth.api import KolibriAuthPermissionsFilter from kolibri.core.auth.models import Classroom from kolibri.core.auth.models import Facility from kolibri.core.content.api import ContentNodeProgressViewset @@ -98,7 +97,6 @@ class LearnerClassroomViewset(ReadOnlyValuesViewset): along with all associated assignments. """ - filter_backends = (KolibriAuthPermissionsFilter,) permission_classes = (IsAuthenticated,) values = ("id", "name") diff --git a/requirements/base.txt b/requirements/base.txt index 8f907e0d5f6..5e7b1f49ecb 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -30,3 +30,4 @@ ifcfg==0.21 Click==7.0 whitenoise==4.1.4 idna==2.8 +ifaddr==0.1.7 # Pin as version 0.2.0 only supports Python 3.7 and above