diff --git a/openedx/core/djangoapps/content_libraries/api/libraries.py b/openedx/core/djangoapps/content_libraries/api/libraries.py
index 8d32e4dbc015..66281addb643 100644
--- a/openedx/core/djangoapps/content_libraries/api/libraries.py
+++ b/openedx/core/djangoapps/content_libraries/api/libraries.py
@@ -53,13 +53,11 @@
from django.db import IntegrityError, transaction
from django.db.models import Q, QuerySet
from django.utils.translation import gettext as _
-from opaque_keys.edx.locator import (
- LibraryLocatorV2,
- LibraryUsageLocatorV2,
-)
-from openedx_events.content_authoring.data import (
- ContentLibraryData,
-)
+from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
+from openedx_authz import api as authz_api
+from openedx_authz.api import assign_role_to_user_in_scope
+from openedx_authz.constants import permissions as authz_permissions
+from openedx_events.content_authoring.data import ContentLibraryData
from openedx_events.content_authoring.signals import (
CONTENT_LIBRARY_CREATED,
CONTENT_LIBRARY_DELETED,
@@ -70,7 +68,6 @@
from organizations.models import Organization
from user_tasks.models import UserTaskArtifact, UserTaskStatus
from xblock.core import XBlock
-from openedx_authz.api import assign_role_to_user_in_scope
from openedx.core.types import User as UserType
@@ -78,6 +75,7 @@
from ..constants import ALL_RIGHTS_RESERVED
from ..models import ContentLibrary, ContentLibraryPermission
from .exceptions import LibraryAlreadyExists, LibraryPermissionIntegrityError
+from .permissions import LEGACY_LIB_PERMISSIONS
log = logging.getLogger(__name__)
@@ -109,6 +107,7 @@
"revert_changes",
"get_backup_task_status",
"assign_library_role_to_user",
+ "user_has_permission_across_lib_authz_systems",
]
@@ -245,7 +244,18 @@ def user_can_create_library(user: AbstractUser) -> bool:
"""
Check if the user has permission to create a content library.
"""
- return user.has_perm(permissions.CAN_CREATE_CONTENT_LIBRARY)
+ library_permission = permissions.CAN_CREATE_CONTENT_LIBRARY
+ lib_permission_in_authz = _transform_legacy_lib_permission_to_authz_permission(library_permission)
+ # The authz_api.is_user_allowed check only validates permissions within a specific library context. Since
+ # creating a library is not tied to an existing one, we use user.has_perm (via Bridgekeeper) to check if the user
+ # can create libraries, meaning they have the course creator role. In the future, this should rely on a global (*)
+ # role defined in the Authorization Framework for instance-level resource creation.
+ has_perms = user.has_perm(library_permission) or authz_api.is_user_allowed(
+ user,
+ lib_permission_in_authz,
+ authz_api.data.GLOBAL_SCOPE_WILDCARD,
+ )
+ return has_perms
def get_libraries_for_user(user, org=None, text_search=None, order=None) -> QuerySet[ContentLibrary]:
@@ -267,7 +277,11 @@ def get_libraries_for_user(user, org=None, text_search=None, order=None) -> Quer
Q(learning_package__description__icontains=text_search)
)
- filtered = permissions.perms[permissions.CAN_VIEW_THIS_CONTENT_LIBRARY].filter(user, qs)
+ # Using distinct() temporarily to avoid duplicate results caused by overlapping permission checks
+ # between Bridgekeeper and the new authorization framework. This ensures correct results for now,
+ # but it should be removed once Bridgekeeper support is fully dropped and all permission logic
+ # is handled through openedx-authz.
+ filtered = permissions.perms[permissions.CAN_VIEW_THIS_CONTENT_LIBRARY].filter(user, qs).distinct()
if order:
order_query = 'learning_package__'
@@ -332,7 +346,7 @@ def require_permission_for_library_key(library_key: LibraryLocatorV2, user: User
library_obj = ContentLibrary.objects.get_by_key(library_key)
# obj should be able to read any valid model object but mypy thinks it can only be
# "User | AnonymousUser | None"
- if not user.has_perm(permission, obj=library_obj): # type:ignore[arg-type]
+ if not user_has_permission_across_lib_authz_systems(user, permission, library_obj):
raise PermissionDenied
return library_obj
@@ -750,3 +764,90 @@ def get_backup_task_status(
result['file'] = artifact.file
return result
+
+
+def _transform_legacy_lib_permission_to_authz_permission(permission: str) -> str:
+ """
+ Transform a legacy content library permission to an openedx-authz permission.
+ """
+ # There is no dedicated permission or role for can_create_content_library in openedx-authz yet,
+ # so we reuse the same permission to rely on user.has_perm via Bridgekeeper.
+ return {
+ permissions.CAN_CREATE_CONTENT_LIBRARY: permissions.CAN_CREATE_CONTENT_LIBRARY,
+ permissions.CAN_DELETE_THIS_CONTENT_LIBRARY: authz_permissions.DELETE_LIBRARY.identifier,
+ permissions.CAN_EDIT_THIS_CONTENT_LIBRARY: authz_permissions.EDIT_LIBRARY_CONTENT.identifier,
+ permissions.CAN_EDIT_THIS_CONTENT_LIBRARY_TEAM: authz_permissions.MANAGE_LIBRARY_TEAM.identifier,
+ permissions.CAN_VIEW_THIS_CONTENT_LIBRARY: authz_permissions.VIEW_LIBRARY.identifier,
+ permissions.CAN_VIEW_THIS_CONTENT_LIBRARY_TEAM: authz_permissions.VIEW_LIBRARY_TEAM.identifier,
+ }.get(permission, permission)
+
+
+def _transform_authz_permission_to_legacy_lib_permission(permission: str) -> str:
+ """
+ Transform an openedx-authz permission to a legacy content library permission.
+ """
+ return {
+ authz_permissions.PUBLISH_LIBRARY_CONTENT.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
+ authz_permissions.CREATE_LIBRARY_COLLECTION.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
+ authz_permissions.EDIT_LIBRARY_COLLECTION.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
+ authz_permissions.DELETE_LIBRARY_COLLECTION.identifier: permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
+ }.get(permission, permission)
+
+
+def user_has_permission_across_lib_authz_systems(
+ user: UserType,
+ permission: str | authz_api.data.PermissionData,
+ library_obj: ContentLibrary,
+) -> bool:
+ """
+ Check whether a user has a given permission on a content library across both the
+ legacy edx-platform permission system and the newer openedx-authz system.
+
+ The provided permission name is normalized to both systems (legacy and authz), and
+ authorization is granted if either:
+ - the user holds the legacy object-level permission on the ContentLibrary instance, or
+ - the openedx-authz API allows the user for the corresponding permission on the library.
+
+ **Note:**
+ Temporary: this function uses Bridgekeeper-based logic for cases not yet modeled in openedx-authz.
+
+ Current gaps covered here:
+ - CAN_CREATE_CONTENT_LIBRARY: we call user.has_perm via Bridgekeeper to verify the user is a course creator.
+ - CAN_VIEW_THIS_CONTENT_LIBRARY: we respect the allow_public_read flag via Bridgekeeper.
+
+ Replace these with authz_api.is_user_allowed once openedx-authz supports
+ these conditions natively (including global (*) roles).
+
+ Args:
+ user: The Django user (or user-like object) to check.
+ permission: The permission identifier (either a legacy codename or an openedx-authz name).
+ library_obj: The ContentLibrary instance to check against.
+
+ Returns:
+ bool: True if the user is authorized by either system; otherwise False.
+ """
+ if isinstance(permission, authz_api.data.PermissionData):
+ permission = permission.identifier
+ if _is_legacy_permission(permission):
+ legacy_permission = permission
+ authz_permission = _transform_legacy_lib_permission_to_authz_permission(permission)
+ else:
+ authz_permission = permission
+ legacy_permission = _transform_authz_permission_to_legacy_lib_permission(permission)
+ return (
+ # Check both the legacy and the new openedx-authz permissions
+ user.has_perm(perm=legacy_permission, obj=library_obj)
+ or authz_api.is_user_allowed(
+ user,
+ authz_permission,
+ str(library_obj.library_key),
+ )
+ )
+
+
+def _is_legacy_permission(permission: str) -> bool:
+ """
+ Determine if the specified library permission is part of the legacy
+ or the new openedx-authz system.
+ """
+ return permission in LEGACY_LIB_PERMISSIONS
diff --git a/openedx/core/djangoapps/content_libraries/api/permissions.py b/openedx/core/djangoapps/content_libraries/api/permissions.py
index 6064b80d6f9e..5b8bd4ba7e1a 100644
--- a/openedx/core/djangoapps/content_libraries/api/permissions.py
+++ b/openedx/core/djangoapps/content_libraries/api/permissions.py
@@ -12,3 +12,13 @@
CAN_VIEW_THIS_CONTENT_LIBRARY,
CAN_VIEW_THIS_CONTENT_LIBRARY_TEAM
)
+
+LEGACY_LIB_PERMISSIONS = frozenset({
+ CAN_CREATE_CONTENT_LIBRARY,
+ CAN_DELETE_THIS_CONTENT_LIBRARY,
+ CAN_EDIT_THIS_CONTENT_LIBRARY,
+ CAN_EDIT_THIS_CONTENT_LIBRARY_TEAM,
+ CAN_LEARN_FROM_THIS_CONTENT_LIBRARY,
+ CAN_VIEW_THIS_CONTENT_LIBRARY,
+ CAN_VIEW_THIS_CONTENT_LIBRARY_TEAM,
+})
diff --git a/openedx/core/djangoapps/content_libraries/permissions.py b/openedx/core/djangoapps/content_libraries/permissions.py
index 4e72381986ed..c3a8b68c947c 100644
--- a/openedx/core/djangoapps/content_libraries/permissions.py
+++ b/openedx/core/djangoapps/content_libraries/permissions.py
@@ -2,8 +2,12 @@
Permissions for Content Libraries (v2, Learning-Core-based)
"""
from bridgekeeper import perms, rules
-from bridgekeeper.rules import Attribute, ManyRelation, Relation, blanket_rule, in_current_groups
+from bridgekeeper.rules import Attribute, ManyRelation, Relation, blanket_rule, in_current_groups, Rule
from django.conf import settings
+from django.db.models import Q
+
+from openedx_authz import api as authz_api
+from openedx_authz.constants.permissions import VIEW_LIBRARY
from openedx.core.djangoapps.content_libraries.models import ContentLibraryPermission
@@ -54,6 +58,154 @@ def is_course_creator(user):
return get_course_creator_status(user) == 'granted'
+
+class HasPermissionInContentLibraryScope(Rule):
+ """Bridgekeeper rule that checks content library permissions via the openedx-authz system.
+
+ This rule integrates the openedx-authz authorization system (backed by Casbin) with
+ Bridgekeeper's declarative permission system. It checks if a user has been granted a
+ specific permission (action) through their role assignments in the authorization system.
+
+ The rule works by:
+ 1. Querying the authorization system to find library scopes where the user has this permission
+ 2. Parsing the library keys (org/slug) from the scopes
+ 3. Building database filters to match ContentLibrary models with those org/slug combinations
+
+ Attributes:
+ permission (PermissionData): The permission object representing the action to check
+ (e.g., 'view', 'edit'). This is used to look up scopes in the authorization system.
+
+ filter_keys (list[str]): The Django model fields to use when building QuerySet filters.
+ Defaults to ['org', 'slug'] for ContentLibrary models.
+
+ These fields are used to construct the Q object filters that match libraries
+ based on the parsed components from library keys in authorization scopes.
+
+ For ContentLibrary, library keys have the format 'lib:ORG:SLUG', which maps to:
+ - 'org' -> filters on org__short_name (related Organization model)
+ - 'slug' -> filters on slug field
+
+ If filtering by different fields is needed, pass a custom list. For example:
+ - ['org', 'slug'] - default for ContentLibrary (filters by org and slug)
+ - ['id'] - filter by primary key (for other models)
+
+ Examples:
+ Basic usage with default filter_keys:
+ >>> from bridgekeeper import perms
+ >>> from openedx.core.djangoapps.content_libraries.permissions import HasPermissionInContentLibraryScope
+ >>>
+ >>> # Uses default filter_keys=['org', 'slug'] for ContentLibrary
+ >>> can_view = HasPermissionInContentLibraryScope('view_library')
+ >>> perms['libraries.view_library'] = can_view
+
+ Compound permissions with boolean operators:
+ >>> from bridgekeeper.rules import Attribute
+ >>>
+ >>> is_active = Attribute('is_active', True)
+ >>> is_staff = Attribute('is_staff', True)
+ >>> can_view = HasPermissionInContentLibraryScope('view_library')
+ >>>
+ >>> # User must be active AND (staff OR have explicit permission)
+ >>> perms['libraries.view_library'] = is_active & (is_staff | can_view)
+
+ QuerySet filtering (efficient, database-level):
+ >>> from openedx.core.djangoapps.content_libraries.models import ContentLibrary
+ >>>
+ >>> # Gets all libraries user can view in a single SQL query
+ >>> visible_libraries = perms['libraries.view_library'].filter(
+ ... request.user,
+ ... ContentLibrary.objects.all()
+ ... )
+
+ Individual object checks:
+ >>> library = ContentLibrary.objects.get(org__short_name='DemoX', slug='CSPROB')
+ >>> if perms['libraries.view_library'].check(request.user, library):
+ ... # User can view this specific library
+
+ Note:
+ The library keys in authorization scopes must have the format 'lib:ORG:SLUG'
+ to match the ContentLibrary model's org.short_name and slug fields.
+ For example, scope 'lib:DemoX:CSPROB' matches a library with
+ org.short_name='DemoX' and slug='CSPROB'.
+ """
+
+ def __init__(self, permission: authz_api.PermissionData, filter_keys: list[str] | None = None):
+ """Initialize the rule with the action and filter keys to filter on.
+
+ Args:
+ permission (PermissionData): The permission to check (e.g., 'view', 'edit').
+ filter_keys (list[str]): The model fields to filter on when building QuerySet filters.
+ Defaults to ['org', 'slug'] for ContentLibrary.
+ """
+ self.permission = permission
+ self.filter_keys = filter_keys if filter_keys is not None else ["org", "slug"]
+
+ def query(self, user):
+ """Convert this rule to a Django Q object for QuerySet filtering.
+
+ Args:
+ user: The Django user object (must have a 'username' attribute).
+
+ Returns:
+ Q: A Django Q object that can be used to filter a QuerySet.
+ The Q object combines multiple conditions using OR (|) operators,
+ where each condition matches a library's org and slug fields:
+ Q(org__short_name='OrgA' & slug='lib-a') | Q(org__short_name='OrgB' & slug='lib-b')
+
+ Example:
+ >>> # User has 'view' permission in scopes: ['lib:OrgA:lib-a', 'lib:OrgB:lib-b']
+ >>> rule = HasPermissionInContentLibraryScope('view', filter_keys=['org', 'slug'])
+ >>> q = rule.query(user)
+ >>> # Results in: Q(org__short_name='OrgA', slug='lib-a') | Q(org__short_name='OrgB', slug='lib-b')
+ >>>
+ >>> # Apply to queryset
+ >>> libraries = ContentLibrary.objects.filter(q)
+ >>> # SQL: SELECT * FROM content_library
+ >>> # WHERE (org.short_name='OrgA' AND slug='lib-a')
+ >>> # OR (org.short_name='OrgB' AND slug='lib-b')
+ """
+ scopes = authz_api.get_scopes_for_user_and_permission(
+ user.username,
+ self.permission.identifier
+ )
+
+ library_keys = [scope.library_key for scope in scopes]
+
+ if not library_keys:
+ return Q(pk__in=[]) # No access, return Q that matches nothing
+
+ # Build Q object: OR together (org AND slug) conditions for each library
+ query = Q()
+ for library_key in library_keys:
+ query |= Q(org__short_name=library_key.org, slug=library_key.slug)
+
+ return query
+
+ def check(self, user, instance, *args, **kwargs): # pylint: disable=arguments-differ
+ """Check if user has permission for a specific object instance.
+
+ This method is used for checking permission on individual objects rather
+ than filtering a QuerySet. It extracts the scope from the object and
+ checks if the user has the required permission in that scope via Casbin.
+
+ Args:
+ user: The Django user object (must have a 'username' attribute).
+ instance: The Django model instance to check permission for.
+ *args: Additional positional arguments (for compatibility with parent signature).
+ **kwargs: Additional keyword arguments (for compatibility with parent signature).
+
+ Returns:
+ bool: True if the user has the permission in the object's scope,
+ False otherwise.
+
+ Example:
+ >>> rule = HasPermissionInContentLibraryScope('view')
+ >>> can_view = rule.check(user, library)
+ >>> # Checks if user has 'view' permission in scope 'lib:DemoX:CSPROB'
+ """
+ return authz_api.is_user_allowed(user.username, self.permission.identifier, str(instance.library_key))
+
+
########################### Permissions ###########################
# Is the user allowed to view XBlocks from the specified content library
@@ -87,7 +239,9 @@ def is_course_creator(user):
is_global_staff |
# Libraries with "public read" permissions can be accessed only by course creators
(Attribute('allow_public_read', True) & is_course_creator) |
- # Otherwise the user must be part of the library's team
+ # Users can access libraries within their authorized scope (via Casbin/role-based permissions)
+ HasPermissionInContentLibraryScope(VIEW_LIBRARY) |
+ # Fallback to: the user must be part of the library's team (legacy permission system)
has_explicit_read_permission_for_library
)
diff --git a/openedx/core/djangoapps/content_libraries/rest_api/blocks.py b/openedx/core/djangoapps/content_libraries/rest_api/blocks.py
index b93b48e5ad86..7aa15f6e7834 100644
--- a/openedx/core/djangoapps/content_libraries/rest_api/blocks.py
+++ b/openedx/core/djangoapps/content_libraries/rest_api/blocks.py
@@ -9,6 +9,7 @@
from django.utils.decorators import method_decorator
from drf_yasg.utils import swagger_auto_schema
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
+from openedx_authz.constants import permissions as authz_permissions
from openedx_learning.api import authoring as authoring_api
from rest_framework import status
from rest_framework.exceptions import NotFound, ValidationError
@@ -238,7 +239,7 @@ def post(self, request, usage_key_str):
api.require_permission_for_library_key(
key.lib_key,
request.user,
- permissions.CAN_EDIT_THIS_CONTENT_LIBRARY
+ authz_permissions.PUBLISH_LIBRARY_CONTENT
)
api.publish_component_changes(key, request.user)
return Response({})
diff --git a/openedx/core/djangoapps/content_libraries/rest_api/collections.py b/openedx/core/djangoapps/content_libraries/rest_api/collections.py
index d893d766d80f..f4d579aa04a2 100644
--- a/openedx/core/djangoapps/content_libraries/rest_api/collections.py
+++ b/openedx/core/djangoapps/content_libraries/rest_api/collections.py
@@ -13,6 +13,7 @@
from rest_framework.status import HTTP_204_NO_CONTENT
from opaque_keys.edx.locator import LibraryLocatorV2
+from openedx_authz.constants import permissions as authz_permissions
from openedx_learning.api import authoring as authoring_api
from openedx_learning.api.authoring_models import Collection
@@ -56,7 +57,6 @@ def get_content_library(self) -> ContentLibrary:
if self.request.method in ['OPTIONS', 'GET']
else permissions.CAN_EDIT_THIS_CONTENT_LIBRARY
)
-
self._content_library = api.require_permission_for_library_key(
library_key,
self.request.user,
@@ -110,6 +110,11 @@ def create(self, request: RestRequest, *args, **kwargs) -> Response:
Create a Collection that belongs to a Content Library
"""
content_library = self.get_content_library()
+ api.require_permission_for_library_key(
+ content_library.library_key,
+ request.user,
+ authz_permissions.CREATE_LIBRARY_COLLECTION
+ )
create_serializer = ContentLibraryCollectionUpdateSerializer(data=request.data)
create_serializer.is_valid(raise_exception=True)
@@ -144,6 +149,11 @@ def partial_update(self, request: RestRequest, *args, **kwargs) -> Response:
Update a Collection that belongs to a Content Library
"""
content_library = self.get_content_library()
+ api.require_permission_for_library_key(
+ content_library.library_key,
+ request.user,
+ authz_permissions.EDIT_LIBRARY_COLLECTION
+ )
collection_key = kwargs["key"]
update_serializer = ContentLibraryCollectionUpdateSerializer(
@@ -165,6 +175,12 @@ def destroy(self, request: RestRequest, *args, **kwargs) -> Response:
"""
Soft-deletes a Collection that belongs to a Content Library
"""
+ content_library = self.get_content_library()
+ api.require_permission_for_library_key(
+ content_library.library_key,
+ request.user,
+ authz_permissions.DELETE_LIBRARY_COLLECTION
+ )
collection = super().get_object()
assert collection.learning_package_id
authoring_api.delete_collection(
@@ -181,6 +197,11 @@ def restore(self, request: RestRequest, *args, **kwargs) -> Response:
Restores a soft-deleted Collection that belongs to a Content Library
"""
content_library = self.get_content_library()
+ api.require_permission_for_library_key(
+ content_library.library_key,
+ request.user,
+ authz_permissions.EDIT_LIBRARY_COLLECTION
+ )
assert content_library.learning_package_id
collection_key = kwargs["key"]
authoring_api.restore_collection(
@@ -198,6 +219,11 @@ def update_items(self, request: RestRequest, *args, **kwargs) -> Response:
Collection and items must all be part of the given library/learning package.
"""
content_library = self.get_content_library()
+ api.require_permission_for_library_key(
+ content_library.library_key,
+ request.user,
+ authz_permissions.EDIT_LIBRARY_COLLECTION
+ )
collection_key = kwargs["key"]
serializer = ContentLibraryItemKeysSerializer(data=request.data)
diff --git a/openedx/core/djangoapps/content_libraries/rest_api/containers.py b/openedx/core/djangoapps/content_libraries/rest_api/containers.py
index 67070a0a82f9..c60c40b9802d 100644
--- a/openedx/core/djangoapps/content_libraries/rest_api/containers.py
+++ b/openedx/core/djangoapps/content_libraries/rest_api/containers.py
@@ -12,6 +12,7 @@
from drf_yasg import openapi
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryContainerLocator
+from openedx_authz.constants import permissions as authz_permissions
from openedx_learning.api import authoring as authoring_api
from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
@@ -379,7 +380,7 @@ def post(self, request: RestRequest, container_key: LibraryContainerLocator) ->
api.require_permission_for_library_key(
container_key.lib_key,
request.user,
- permissions.CAN_EDIT_THIS_CONTENT_LIBRARY,
+ authz_permissions.PUBLISH_LIBRARY_CONTENT
)
api.publish_container_changes(container_key, request.user.id)
# If we need to in the future, we could return a list of all the child containers/components that were
diff --git a/openedx/core/djangoapps/content_libraries/rest_api/libraries.py b/openedx/core/djangoapps/content_libraries/rest_api/libraries.py
index 9f6cca19947a..2d50fa6c8644 100644
--- a/openedx/core/djangoapps/content_libraries/rest_api/libraries.py
+++ b/openedx/core/djangoapps/content_libraries/rest_api/libraries.py
@@ -82,6 +82,7 @@
from user_tasks.models import UserTaskStatus
from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
+from openedx_authz.constants import permissions as authz_permissions
from organizations.api import ensure_organization
from organizations.exceptions import InvalidOrganizationException
from organizations.models import Organization
@@ -219,7 +220,7 @@ def post(self, request):
"""
Create a new content library.
"""
- if not request.user.has_perm(permissions.CAN_CREATE_CONTENT_LIBRARY):
+ if not api.user_can_create_library(request.user):
raise PermissionDenied
serializer = ContentLibraryMetadataSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
@@ -479,7 +480,11 @@ def post(self, request, lib_key_str):
descendants.
"""
key = LibraryLocatorV2.from_string(lib_key_str)
- api.require_permission_for_library_key(key, request.user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY)
+ api.require_permission_for_library_key(
+ key,
+ request.user,
+ authz_permissions.PUBLISH_LIBRARY_CONTENT
+ )
api.publish_changes(key, request.user.id)
return Response({})
@@ -838,7 +843,7 @@ def post(self, request):
"""
Restore a library from a backup file.
"""
- if not request.user.has_perm(permissions.CAN_CREATE_CONTENT_LIBRARY):
+ if not api.user_can_create_library(request.user):
raise PermissionDenied
serializer = LibraryRestoreFileSerializer(data=request.data)
diff --git a/openedx/core/djangoapps/content_libraries/rest_api/serializers.py b/openedx/core/djangoapps/content_libraries/rest_api/serializers.py
index a1e24c6a64a4..c0bf07d087fc 100644
--- a/openedx/core/djangoapps/content_libraries/rest_api/serializers.py
+++ b/openedx/core/djangoapps/content_libraries/rest_api/serializers.py
@@ -14,6 +14,7 @@
from user_tasks.models import UserTaskStatus
from openedx.core.djangoapps.content_libraries.tasks import LibraryRestoreTask
+from openedx.core.djangoapps.content_libraries import api
from openedx.core.djangoapps.content_libraries.api.containers import ContainerType
from openedx.core.djangoapps.content_libraries.constants import ALL_RIGHTS_RESERVED, LICENSE_OPTIONS
from openedx.core.djangoapps.content_libraries.models import (
@@ -75,7 +76,8 @@ def get_can_edit_library(self, obj):
return False
library_obj = ContentLibrary.objects.get_by_key(obj.key)
- return user.has_perm(permissions.CAN_EDIT_THIS_CONTENT_LIBRARY, obj=library_obj)
+ return api.user_has_permission_across_lib_authz_systems(
+ user, permissions.CAN_EDIT_THIS_CONTENT_LIBRARY, library_obj)
class ContentLibraryUpdateSerializer(serializers.Serializer):
diff --git a/openedx/core/djangoapps/content_libraries/tests/test_content_libraries.py b/openedx/core/djangoapps/content_libraries/tests/test_content_libraries.py
index c4f61f47e254..91a9c29a3754 100644
--- a/openedx/core/djangoapps/content_libraries/tests/test_content_libraries.py
+++ b/openedx/core/djangoapps/content_libraries/tests/test_content_libraries.py
@@ -12,14 +12,17 @@
import ddt
import tomlkit
+from bridgekeeper import perms
from django.core.files.uploadedfile import SimpleUploadedFile
from django.contrib.auth.models import Group
+from django.db.models import Q
from django.test import override_settings
from django.test.client import Client
from freezegun import freeze_time
-from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2
+from opaque_keys.edx.locator import LibraryLocatorV2, LibraryUsageLocatorV2, LibraryCollectionLocator
from organizations.models import Organization
from rest_framework.test import APITestCase
+from rest_framework import status
from openedx_learning.api.authoring_models import LearningPackage
from user_tasks.models import UserTaskStatus, UserTaskArtifact
@@ -33,10 +36,15 @@
URL_BLOCK_XBLOCK_HANDLER,
ContentLibrariesRestApiTest,
)
+from openedx_authz import api as authz_api
+from openedx_authz.constants import roles
+from openedx_authz.engine.enforcer import AuthzEnforcer
from openedx.core.djangoapps.xblock import api as xblock_api
from openedx.core.djangolib.testing.utils import skip_unless_cms
+from openedx_authz.constants.permissions import VIEW_LIBRARY
-from ..models import ContentLibrary
+from ..models import ContentLibrary, ContentLibraryPermission
+from ..permissions import CAN_VIEW_THIS_CONTENT_LIBRARY, HasPermissionInContentLibraryScope
@skip_unless_cms
@@ -1217,6 +1225,462 @@ def test_uncaught_error_creates_error_log(self):
self.assertEqual(task_data, expected)
+@skip_unless_cms
+class ContentLibrariesAuthZTestCase(ContentLibrariesRestApiTest):
+ """
+ Tests for Content Libraries AuthZ integration via openedx-authz.
+
+ These tests verify the HasPermissionInContentLibraryScope Bridgekeeper rule
+ integrates correctly with the openedx-authz authorization system (Casbin).
+ See: https://github.com/openedx/openedx-authz/
+
+ IMPORTANT: These tests explicitly remove legacy ContentLibraryPermission grants
+ to ensure ONLY the AuthZ system is being tested, not the legacy fallback.
+ """
+
+ def setUp(self):
+ super().setUp()
+ # The parent class provides self.user (a staff user) and self.organization
+ # Set up admin_user as an alias to self.user for test readability
+ self.admin_user = self.user
+ # Set up org_short_name for convenience
+ self.org_short_name = self.organization.short_name
+
+ def test_authz_scope_filters_by_authorized_libraries(self):
+ """
+ Test that HasPermissionInContentLibraryScope rule filters libraries
+ based on authorized org/slug combinations.
+
+ Given:
+ - 3 libraries: lib1 (org1), lib2 (org2), lib3 (org1)
+ - User authorized for lib1 and lib2 only via AuthZ (NO legacy permissions)
+
+ Expected:
+ - Filter returns exactly 2 libraries (lib1 and lib2)
+ - lib3 is excluded (same org as lib1, but different slug)
+ - Correct org/slug combinations are matched
+ """
+ user = UserFactory.create(username="scope_user", is_staff=False)
+
+ Organization.objects.get_or_create(short_name="org1", defaults={"name": "Org 1"})
+ Organization.objects.get_or_create(short_name="org2", defaults={"name": "Org 2"})
+
+ with self.as_user(self.admin_user):
+ lib1 = self._create_library(slug="lib1", org="org1", title="Library 1")
+ lib2 = self._create_library(slug="lib2", org="org2", title="Library 2")
+ self._create_library(slug="lib3", org="org1", title="Library 3")
+
+ # CRITICAL: Ensure user has NO legacy permissions (test ONLY AuthZ filtering)
+ ContentLibraryPermission.objects.filter(user=user).delete()
+
+ with patch(
+ 'openedx_authz.api.get_scopes_for_user_and_permission'
+ ) as mock_get_scopes:
+ # Mock: User authorized for lib1 (org1:lib1) and lib2 (org2:lib2) only, NOT lib3
+ mock_scope1 = type('Scope', (), {'library_key': LibraryLocatorV2.from_string(lib1['id'])})()
+ mock_scope2 = type('Scope', (), {'library_key': LibraryLocatorV2.from_string(lib2['id'])})()
+ mock_get_scopes.return_value = [mock_scope1, mock_scope2]
+
+ all_libs = ContentLibrary.objects.filter(slug__in=['lib1', 'lib2', 'lib3'])
+ filtered = perms[CAN_VIEW_THIS_CONTENT_LIBRARY].filter(user, all_libs).distinct()
+
+ # TEST: Verify exactly 2 libraries returned (lib1 and lib2, not lib3)
+ self.assertEqual(filtered.count(), 2, "Should return exactly 2 authorized libraries")
+
+ # TEST: Verify correct libraries are included/excluded
+ slugs = set(filtered.values_list('slug', flat=True))
+ self.assertIn('lib1', slugs, "lib1 (org1:lib1) should be included")
+ self.assertIn('lib2', slugs, "lib2 (org2:lib2) should be included")
+ self.assertNotIn('lib3', slugs, "lib3 (org1:lib3) should be excluded")
+
+ # TEST: Verify the org/slug combinations match
+ lib1_result = filtered.get(slug='lib1')
+ lib2_result = filtered.get(slug='lib2')
+ self.assertEqual(lib1_result.org.short_name, 'org1')
+ self.assertEqual(lib2_result.org.short_name, 'org2')
+
+ def test_authz_scope_individual_check_with_permission(self):
+ """
+ Test that HasPermissionInContentLibraryScope.check() returns True
+ when authorization is granted.
+
+ Given:
+ - Non-staff user
+ - Library exists
+ - Authorization system grants permission (mocked)
+ - NO legacy permissions
+
+ Expected:
+ - check() returns True
+ """
+ user = UserFactory.create(username="check_user", is_staff=False)
+
+ with self.as_user(self.admin_user):
+ lib = self._create_library(slug="check-lib", org=self.org_short_name, title="Check Library")
+
+ library_obj = ContentLibrary.objects.get_by_key(LibraryLocatorV2.from_string(lib["id"]))
+
+ # CRITICAL: Ensure user has NO legacy permissions (test ONLY AuthZ)
+ ContentLibraryPermission.objects.filter(user=user).delete()
+
+ with patch("openedx_authz.api.is_user_allowed", return_value=True):
+ result = perms[CAN_VIEW_THIS_CONTENT_LIBRARY].check(user, library_obj)
+
+ self.assertTrue(result, "Should return True when user is authorized")
+
+ def test_authz_scope_individual_check_without_permission(self):
+ """
+ Test that HasPermissionInContentLibraryScope.check() returns False
+ when authorization is denied.
+
+ Given:
+ - Non-staff user
+ - Non-public library
+ - Authorization system denies permission (mocked)
+ - NO legacy permissions
+
+ Expected:
+ - check() returns False
+ """
+ user = UserFactory.create(username="no_perm_user", is_staff=False)
+
+ with self.as_user(self.admin_user):
+ lib = self._create_library(slug="no-perm-lib", org=self.org_short_name, title="No Permission Library")
+
+ library_obj = ContentLibrary.objects.get_by_key(LibraryLocatorV2.from_string(lib['id']))
+
+ # CRITICAL: Ensure user has NO legacy permissions (test ONLY AuthZ)
+ ContentLibraryPermission.objects.filter(user=user).delete()
+
+ with patch('openedx_authz.api.is_user_allowed', return_value=False):
+ result = perms[CAN_VIEW_THIS_CONTENT_LIBRARY].check(user, library_obj)
+
+ self.assertFalse(result, "Should return False when user is not authorized")
+
+ self.assertFalse(library_obj.allow_public_read)
+ self.assertFalse(user.is_staff)
+
+ def test_authz_scope_handles_empty_scopes(self):
+ """
+ Test that HasPermissionInContentLibraryScope.query() returns empty
+ result when user has no authorized scopes.
+
+ Given:
+ - Non-staff user
+ - Library exists in database
+ - Authorization system returns empty scope list (mocked)
+ - NO legacy permissions
+
+ Expected:
+ - Filter returns 0 libraries
+ - Library exists in database but is not accessible
+ """
+ user = UserFactory.create(username="empty_user", is_staff=False)
+
+ with self.as_user(self.admin_user):
+ self._create_library(slug="empty-lib", title="Empty Scopes Test")
+
+ # CRITICAL: Ensure user has NO legacy permissions (test ONLY AuthZ)
+ ContentLibraryPermission.objects.filter(user=user).delete()
+
+ with patch(
+ 'openedx_authz.api.get_scopes_for_user_and_permission',
+ return_value=[]
+ ):
+ filtered = perms[CAN_VIEW_THIS_CONTENT_LIBRARY].filter(
+ user,
+ ContentLibrary.objects.filter(slug="empty-lib")
+ ).distinct()
+
+ self.assertEqual(
+ filtered.count(),
+ 0,
+ "Should return 0 libraries when user has no authorized scopes",
+ )
+
+ self.assertTrue(
+ ContentLibrary.objects.filter(slug="empty-lib").exists(),
+ "Library should exist in database",
+ )
+
+ def test_authz_scope_q_object_has_correct_structure(self):
+ """
+ Test that HasPermissionInContentLibraryScope.query() generates Q object
+ with structure: Q(org__short_name='X') & Q(slug='Y') for each scope.
+
+ Multiple scopes should be OR'd:
+ (Q(org__short_name='org1') & Q(slug='lib1')) | (Q(org__short_name='org2') & Q(slug='lib2'))
+
+ Note: This test focuses on Q object structure, not filtering behavior,
+ so legacy permissions don't affect the outcome.
+ """
+ user = UserFactory.create(username="q_user")
+ rule = HasPermissionInContentLibraryScope(VIEW_LIBRARY, filter_keys=['org', 'slug'])
+
+ with patch(
+ "openedx_authz.api.get_scopes_for_user_and_permission"
+ ) as mock_get_scopes:
+ # Create scopes with specific org/slug values we can verify
+ mock_scope1 = type("Scope", (), {
+ "library_key": type("Key", (), {"org": "specific-org1", "slug": "specific-slug1"})()
+ })()
+ mock_scope2 = type("Scope", (), {
+ "library_key": type("Key", (), {"org": "specific-org2", "slug": "specific-slug2"})()
+ })()
+ mock_get_scopes.return_value = [mock_scope1, mock_scope2]
+
+ q_obj = rule.query(user)
+
+ # Test 1: Verify it returns a Q object
+ self.assertIsInstance(q_obj, Q)
+
+ # Test 2: Verify Q object uses OR connector (for multiple scopes)
+ self.assertEqual(
+ q_obj.connector,
+ 'OR',
+ "Should use OR to combine different library scopes",
+ )
+
+ # Test 3: Verify the Q object string contains the exact fields and values
+ q_str = str(q_obj)
+
+ # Should filter by org__short_name field
+ self.assertIn(
+ "org__short_name",
+ q_str,
+ "Q object must filter by org__short_name field",
+ )
+
+ # Should filter by slug field
+ self.assertIn(
+ "slug",
+ q_str,
+ "Q object must filter by slug field",
+ )
+
+ # Should contain exact org values
+ self.assertIn(
+ "specific-org1",
+ q_str,
+ "Q object must include 'specific-org1'",
+ )
+ self.assertIn(
+ "specific-org2",
+ q_str,
+ "Q object must include 'specific-org2'",
+ )
+
+ # Should contain exact slug values
+ self.assertIn(
+ "specific-slug1",
+ q_str,
+ "Q object must include 'specific-slug1'",
+ )
+ self.assertIn(
+ 'specific-slug2',
+ q_str,
+ "Q object must include 'specific-slug2'",
+ )
+
+ def test_authz_scope_q_object_matches_exact_org_slug_pairs(self):
+ """
+ Test that the Q object filters by EXACT (org, slug) pairs, not just org OR slug.
+
+ Critical test: Verifies the rule generates:
+ Q(org__short_name='org1' AND slug='lib1') OR Q(org__short_name='org2' AND slug='lib2')
+
+ NOT just:
+ Q(org__short_name IN ['org1', 'org2']) OR Q(slug IN ['lib1', 'lib2'])
+
+ Creates scenario:
+ - lib1: org1 + lib1 (authorized)
+ - lib2: org2 + lib2 (authorized)
+ - lib3: org1 + lib3 (NOT authorized - same org, different slug)
+ - lib4: org3 + lib1 (NOT authorized - same slug, different org)
+ """
+ user = UserFactory.create(username="exact_pair_user")
+ rule = HasPermissionInContentLibraryScope(VIEW_LIBRARY, filter_keys=['org', 'slug'])
+
+ Organization.objects.get_or_create(short_name="pair-org1", defaults={"name": "Pair Org 1"})
+ Organization.objects.get_or_create(short_name="pair-org2", defaults={"name": "Pair Org 2"})
+ Organization.objects.get_or_create(short_name="pair-org3", defaults={"name": "Pair Org 3"})
+
+ with self.as_user(self.admin_user):
+ lib1 = self._create_library(slug="pair-lib1", org="pair-org1", title="Pair Lib 1")
+ lib2 = self._create_library(slug="pair-lib2", org="pair-org2", title="Pair Lib 2")
+ self._create_library(slug="pair-lib3", org="pair-org1", title="Pair Lib 3") # Same org as lib1
+ self._create_library(slug="pair-lib1", org="pair-org3", title="Pair Lib 4") # Same slug as lib1
+
+ # CRITICAL: Ensure user has NO legacy permissions (test ONLY AuthZ filtering)
+ ContentLibraryPermission.objects.filter(user=user).delete()
+
+ with patch(
+ 'openedx_authz.api.get_scopes_for_user_and_permission'
+ ) as mock_get_scopes:
+ # Authorize ONLY (pair-org1, pair-lib1) and (pair-org2, pair-lib2)
+ lib1_key = LibraryLocatorV2.from_string(lib1['id'])
+ lib2_key = LibraryLocatorV2.from_string(lib2['id'])
+
+ mock_get_scopes.return_value = [
+ type('Scope', (), {'library_key': lib1_key})(),
+ type('Scope', (), {'library_key': lib2_key})(),
+ ]
+
+ q_obj = rule.query(user)
+ filtered = ContentLibrary.objects.filter(q_obj)
+
+ # TEST: Verify EXACTLY 2 libraries match (lib1 and lib2 only)
+ self.assertEqual(
+ filtered.count(),
+ 2,
+ "Must match EXACTLY 2 libraries - only those with authorized (org, slug) pairs",
+ )
+
+ # TEST: Verify lib1 matches (pair-org1, pair-lib1)
+ lib1_result = filtered.filter(slug='pair-lib1', org__short_name='pair-org1')
+ self.assertEqual(
+ lib1_result.count(),
+ 1,
+ "Must match lib1: (pair-org1, pair-lib1) - this exact pair is authorized",
+ )
+
+ # TEST: Verify lib2 matches (pair-org2, pair-lib2)
+ lib2_result = filtered.filter(slug='pair-lib2', org__short_name='pair-org2')
+ self.assertEqual(
+ lib2_result.count(),
+ 1,
+ "Must match lib2: (pair-org2, pair-lib2) - this exact pair is authorized",
+ )
+
+ # TEST: Verify lib3 does NOT match (pair-org1, pair-lib3)
+ lib3_result = filtered.filter(slug='pair-lib3', org__short_name='pair-org1')
+ self.assertEqual(
+ lib3_result.count(),
+ 0,
+ "Must NOT match lib3: (pair-org1, pair-lib3) - only pair-lib1 is authorized for pair-org1",
+ )
+
+ # TEST: Verify lib4 does NOT match (pair-org3, pair-lib1)
+ lib4_result = filtered.filter(slug='pair-lib1', org__short_name='pair-org3')
+ self.assertEqual(
+ lib4_result.count(),
+ 0,
+ "Must NOT match lib4: (pair-org3, pair-lib1) - only pair-org1 is authorized for pair-lib1",
+ )
+
+ # TEST: Verify the result set contains exactly the right libraries
+ result_pairs = set(filtered.values_list('org__short_name', 'slug'))
+ expected_pairs = {('pair-org1', 'pair-lib1'), ('pair-org2', 'pair-lib2')}
+ self.assertEqual(
+ result_pairs,
+ expected_pairs,
+ f"Result must contain exactly {expected_pairs}, got {result_pairs}",
+ )
+
+ def test_authz_scope_with_combined_authz_and_legacy_permissions(self):
+ """
+ Test that the filter returns libraries when user has BOTH AuthZ AND legacy permissions.
+
+ The CAN_VIEW_THIS_CONTENT_LIBRARY permission uses OR logic:
+ is_user_active & (
+ is_global_staff |
+ (allow_public_read & is_course_creator) |
+ HasPermissionInContentLibraryScope(VIEW_LIBRARY) | # AuthZ
+ has_explicit_read_permission_for_library # Legacy
+ )
+
+ This means a user with BOTH types of permissions should get access through EITHER system.
+
+ Test scenario:
+ - lib1: User has AuthZ permission only
+ - lib2: User has legacy permission only
+ - lib3: User has BOTH AuthZ AND legacy permissions
+ - lib4: User has NO permissions
+
+ Expected behavior:
+ - Filter returns lib1, lib2, and lib3 (NOT lib4)
+ - Having both permission types doesn't break filtering
+ - Each permission system contributes its authorized libraries
+ """
+ user = UserFactory.create(username="combined_perm_user", is_staff=False)
+
+ Organization.objects.get_or_create(short_name="comb-org", defaults={"name": "Combined Org"})
+
+ with self.as_user(self.admin_user):
+ lib1 = self._create_library(slug="comb-lib1", org="comb-org", title="AuthZ Only Library")
+ lib2 = self._create_library(slug="comb-lib2", org="comb-org", title="Legacy Only Library")
+ lib3 = self._create_library(slug="comb-lib3", org="comb-org", title="Both AuthZ and Legacy Library")
+ lib4 = self._create_library(slug="comb-lib4", org="comb-org", title="No Permissions Library")
+
+ # Retrieve library objects for permission assignment
+ lib1_obj = ContentLibrary.objects.get_by_key(LibraryLocatorV2.from_string(lib1['id']))
+ lib2_obj = ContentLibrary.objects.get_by_key(LibraryLocatorV2.from_string(lib2['id']))
+ lib3_obj = ContentLibrary.objects.get_by_key(LibraryLocatorV2.from_string(lib3['id']))
+
+ # Set up legacy permissions: lib2 (legacy only), lib3 (both)
+ ContentLibraryPermission.objects.create(
+ library=lib2_obj,
+ user=user,
+ access_level=ContentLibraryPermission.READ_LEVEL,
+ )
+ ContentLibraryPermission.objects.create(
+ library=lib3_obj,
+ user=user,
+ access_level=ContentLibraryPermission.READ_LEVEL,
+ )
+
+ with patch(
+ 'openedx_authz.api.get_scopes_for_user_and_permission'
+ ) as mock_get_scopes:
+ # Set up AuthZ permissions: lib1 (AuthZ only), lib3 (both)
+ lib1_key = LibraryLocatorV2.from_string(lib1['id'])
+ lib3_key = LibraryLocatorV2.from_string(lib3['id'])
+
+ mock_get_scopes.return_value = [
+ type('Scope', (), {'library_key': lib1_key})(),
+ type('Scope', (), {'library_key': lib3_key})(),
+ ]
+
+ all_libs = ContentLibrary.objects.filter(slug__in=['comb-lib1', 'comb-lib2', 'comb-lib3', 'comb-lib4'])
+ filtered = perms[CAN_VIEW_THIS_CONTENT_LIBRARY].filter(user, all_libs).distinct()
+
+ # TEST: Verify exactly 3 libraries returned (lib1, lib2, lib3 - NOT lib4)
+ self.assertEqual(
+ filtered.count(),
+ 3,
+ "Should return exactly 3 libraries: AuthZ-only, legacy-only, and both",
+ )
+
+ # TEST: Verify correct libraries are included
+ slugs = set(filtered.values_list('slug', flat=True))
+ self.assertIn('comb-lib1', slugs, "lib1 should be accessible via AuthZ permission")
+ self.assertIn('comb-lib2', slugs, "lib2 should be accessible via legacy permission")
+ self.assertIn('comb-lib3', slugs, "lib3 should be accessible via BOTH AuthZ and legacy permissions")
+ self.assertNotIn('comb-lib4', slugs, "lib4 should NOT be accessible (no permissions)")
+
+ # TEST: Verify lib3 doesn't get duplicated despite having both permission types
+ lib3_results = filtered.filter(slug='comb-lib3')
+ self.assertEqual(
+ lib3_results.count(),
+ 1,
+ "lib3 should appear exactly once despite having both AuthZ and legacy permissions",
+ )
+
+ # TEST: Verify the permission sources work independently
+ # This demonstrates the OR logic: user gets access if EITHER permission type grants it
+ result_pairs = set(filtered.values_list('org__short_name', 'slug'))
+ expected_pairs = {
+ ('comb-org', 'comb-lib1'), # AuthZ only
+ ('comb-org', 'comb-lib2'), # Legacy only
+ ('comb-org', 'comb-lib3'), # Both
+ }
+ self.assertEqual(
+ result_pairs,
+ expected_pairs,
+ f"Should get exactly the 3 authorized libraries via OR logic, got {result_pairs}",
+ )
+
+
@ddt.ddt
class ContentLibraryXBlockValidationTest(APITestCase):
"""Tests only focused on service validation, no Learning Core interactions here."""
@@ -1244,3 +1708,282 @@ def test_xblock_handler_invalid_key(self):
secure_token='random',
)))
self.assertEqual(response.status_code, 404)
+
+
+@skip_unless_cms
+class ContentLibrariesRestAPIAuthzIntegrationTestCase(ContentLibrariesRestApiTest):
+ """
+ Test that Content Libraries REST API endpoints respect AuthZ roles and permissions.
+
+ Roles tested:
+ 1. Library Admin: Full access to all library operations.
+ 2. Library Author: Can view and edit library content, but cannot delete the library.
+ 3. Library Contributor: Can view and edit library content, but cannot delete or publish the library.
+ 4. Library User: Can only view library content.
+ """
+
+ def setUp(self):
+ super().setUp()
+ self._seed_database_with_policies()
+
+ self.library_admin = UserFactory.create(
+ username="library_admin",
+ email="libadmin@example.com")
+ self.library_author = UserFactory.create(
+ username="library_author",
+ email="libauthor@example.com")
+ self.library_contributor = UserFactory.create(
+ username="library_contributor",
+ email="libcontributor@example.com")
+ self.library_user = UserFactory.create(
+ username="library_user",
+ email="libuser@example.com")
+ self.random_user = UserFactory.create(
+ username="random_user",
+ email="random@example.com")
+
+ # Define user groups by permission level
+ self.list_of_all_users = [
+ self.library_admin,
+ self.library_author,
+ self.library_contributor,
+ self.library_user,
+ self.random_user,
+ ]
+ self.library_viewers = [self.library_admin, self.library_author, self.library_contributor, self.library_user]
+ self.library_editors = [self.library_admin, self.library_author, self.library_contributor]
+ self.library_publishers = [self.library_admin, self.library_author]
+ self.library_collection_editors = [self.library_admin, self.library_author, self.library_contributor]
+ self.library_deleters = [self.library_admin]
+
+ # Create library and assign roles
+ library = self._create_library(
+ slug="authzlib",
+ title="AuthZ Test Library",
+ description="Testing AuthZ",
+ )
+ self.lib_id = library["id"]
+
+ authz_api.assign_role_to_user_in_scope(
+ self.library_admin.username,
+ roles.LIBRARY_ADMIN.external_key, self.lib_id)
+ authz_api.assign_role_to_user_in_scope(
+ self.library_author.username,
+ roles.LIBRARY_AUTHOR.external_key, self.lib_id)
+ authz_api.assign_role_to_user_in_scope(
+ self.library_contributor.username,
+ roles.LIBRARY_CONTRIBUTOR.external_key, self.lib_id)
+ authz_api.assign_role_to_user_in_scope(
+ self.library_user.username,
+ roles.LIBRARY_USER.external_key, self.lib_id)
+ AuthzEnforcer.get_enforcer().load_policy() # Load policies to simulate fresh start
+
+ def tearDown(self):
+ """Clean up after each test to ensure isolation."""
+ super().tearDown()
+ AuthzEnforcer.get_enforcer().clear_policy() # Clear policies after each test to ensure isolation
+
+ @classmethod
+ def _seed_database_with_policies(cls):
+ """Seed the database with policies from the policy file.
+
+ This simulates the one-time database seeding that would happen
+ during application deployment, separate from the runtime policy loading.
+ """
+ import pkg_resources
+ from openedx_authz.engine.utils import migrate_policy_between_enforcers
+ import casbin
+
+ global_enforcer = AuthzEnforcer.get_enforcer()
+ global_enforcer.load_policy()
+ model_path = pkg_resources.resource_filename("openedx_authz.engine", "config/model.conf")
+ policy_path = pkg_resources.resource_filename("openedx_authz.engine", "config/authz.policy")
+
+ migrate_policy_between_enforcers(
+ source_enforcer=casbin.Enforcer(model_path, policy_path),
+ target_enforcer=global_enforcer,
+ )
+ global_enforcer.clear_policy() # Clear to simulate fresh start for each test
+
+ def _all_users_excluding(self, excluded_users):
+ return set(self.list_of_all_users) - set(excluded_users)
+
+ def test_view_permissions(self):
+ """
+ Verify that only users with view permissions can view.
+ """
+ # Test library view access
+ for user in self.library_viewers:
+ with self.as_user(user):
+ self._get_library(self.lib_id, expect_response=status.HTTP_200_OK)
+ for user in self._all_users_excluding(self.library_viewers):
+ with self.as_user(user):
+ self._get_library(self.lib_id, expect_response=status.HTTP_403_FORBIDDEN)
+
+ def test_edit_permissions(self):
+ """
+ Verify that only users with edit permissions can edit.
+ """
+ # Test library edit access
+ for user in self.library_editors:
+ with self.as_user(user):
+ self._update_library(
+ self.lib_id,
+ description=f"Description by {user.username}",
+ expect_response=status.HTTP_200_OK,
+ )
+ #Verify the permitted changes were made
+ data = self._get_library(self.lib_id)
+ assert data['description'] == f"Description by {user.username}"
+
+ for user in self._all_users_excluding(self.library_editors):
+ with self.as_user(user):
+ self._update_library(
+ self.lib_id,
+ description="I can't edit this.", expect_response=status.HTTP_403_FORBIDDEN)
+
+ # Verify the no permitted changes weren't made:
+ data = self._get_library(self.lib_id)
+ assert data['description'] != "I can't edit this."
+
+ # Library XBlock editing
+ for user in self.library_editors:
+ with self.as_user(user):
+ # They can create blocks
+ block_data = self._add_block_to_library(self.lib_id, "problem", f"problem_{user.username}")
+ # They can modify blocks
+ self._set_library_block_olx(
+ block_data["id"],
+ "",
+ expect_response=status.HTTP_200_OK)
+ self._set_library_block_fields(
+ block_data["id"],
+ {"data": "", "metadata": {}},
+ expect_response=status.HTTP_200_OK)
+ self._set_library_block_asset(
+ block_data["id"],
+ "static/test.txt",
+ b"data",
+ expect_response=status.HTTP_200_OK)
+ # They can remove blocks
+ self._delete_library_block(block_data["id"], expect_response=status.HTTP_200_OK)
+ # Verify deletion
+ self._get_library_block(block_data["id"], expect_response=404)
+
+ # Recreate blocks for further tests
+ block_data = self._add_block_to_library(self.lib_id, "problem", "new_problem")
+
+ for user in self._all_users_excluding(self.library_editors):
+ with self.as_user(user):
+ self._add_block_to_library(
+ self.lib_id,
+ "problem",
+ "problem1",
+ expect_response=status.HTTP_403_FORBIDDEN)
+ # They can't modify blocks
+ self._set_library_block_olx(
+ block_data["id"],
+ "",
+ expect_response=status.HTTP_403_FORBIDDEN)
+ self._set_library_block_fields(
+ block_data["id"],
+ {"data": "", "metadata": {}},
+ expect_response=status.HTTP_403_FORBIDDEN)
+ self._set_library_block_asset(
+ block_data["id"],
+ "static/test.txt",
+ b"data",
+ expect_response=status.HTTP_403_FORBIDDEN)
+ # They can't remove blocks
+ self._delete_library_block(block_data["id"], expect_response=status.HTTP_403_FORBIDDEN)
+
+ def test_publish_permissions(self):
+ """
+ Verify that only users with publish permissions can publish.
+ """
+ # Test publish access
+ for user in self.library_publishers:
+ with self.as_user(user):
+ block_data = self._add_block_to_library(self.lib_id, "problem", f"problem_{user.username}_1")
+ self._publish_library_block(block_data["id"], expect_response=status.HTTP_200_OK)
+ block_data = self._add_block_to_library(self.lib_id, "problem", f"problem_{user.username}_2")
+ assert self._get_library(self.lib_id)['has_unpublished_changes'] is True
+ self._commit_library_changes(self.lib_id, expect_response=status.HTTP_200_OK)
+ assert self._get_library(self.lib_id)['has_unpublished_changes'] is False
+
+ block_data = self._add_block_to_library(self.lib_id, "problem", "draft_problem")
+ assert self._get_library(self.lib_id)['has_unpublished_changes'] is True
+
+ for user in self._all_users_excluding(self.library_publishers):
+ with self.as_user(user):
+ self._publish_library_block(block_data["id"], expect_response=status.HTTP_403_FORBIDDEN)
+ self._commit_library_changes(self.lib_id, expect_response=status.HTTP_403_FORBIDDEN)
+ # Verify that no changes were published
+ assert self._get_library(self.lib_id)['has_unpublished_changes'] is True
+
+ def test_collection_permissions(self):
+ """
+ Verify that only users with collection permissions can perform collection actions.
+ """
+ library_key = LibraryLocatorV2.from_string(self.lib_id)
+ block_data = self._add_block_to_library(self.lib_id, "problem", "collection_problem")
+ # Test library collection access
+ for user in self.library_collection_editors:
+ with self.as_user(user):
+ # Create collection
+ collection_data = self._create_collection(
+ self.lib_id,
+ title=f"Temp Collection {user.username}",
+ expect_response=status.HTTP_200_OK)
+ collection_id = collection_data["key"]
+ collection_key = LibraryCollectionLocator(lib_key=library_key, collection_id=collection_id)
+ # Update collection
+ self._update_collection(collection_key, title="Updated Collection", expect_response=status.HTTP_200_OK)
+ self._add_items_to_collection(
+ collection_key,
+ item_keys=[block_data["id"]],
+ expect_response=status.HTTP_200_OK)
+ # Delete collection
+ self._soft_delete_collection(collection_key, expect_response=status.HTTP_204_NO_CONTENT)
+
+ collection_data = self._create_collection(
+ self.lib_id,
+ title="New Temp Collection",
+ expect_response=status.HTTP_200_OK)
+ collection_id = collection_data["key"]
+ collection_key = LibraryCollectionLocator(lib_key=library_key, collection_id=collection_id)
+
+ for user in self._all_users_excluding(self.library_collection_editors):
+ with self.as_user(user):
+ # Attempt to create collection
+ self._create_collection(
+ self.lib_id,
+ title="Unauthorized Collection",
+ expect_response=status.HTTP_403_FORBIDDEN)
+ # Attempt to update collection
+ self._update_collection(
+ collection_key,
+ title="Unauthorized Change",
+ expect_response=status.HTTP_403_FORBIDDEN)
+ self._add_items_to_collection(
+ collection_key,
+ item_keys=[block_data["id"]],
+ expect_response=status.HTTP_403_FORBIDDEN)
+ # Attempt to delete collection
+ self._soft_delete_collection(collection_key, expect_response=status.HTTP_403_FORBIDDEN)
+
+ def test_delete_library_permissions(self):
+ """
+ Verify that only users with delete permissions can delete a library.
+ """
+ # Test library delete access
+ for user in self._all_users_excluding(self.library_deleters):
+ with self.as_user(user):
+ result = self._delete_library(self.lib_id, expect_response=status.HTTP_403_FORBIDDEN)
+ assert 'detail' in result # Error message
+ assert 'permission' in result['detail'].lower()
+
+ for user in self.library_deleters:
+ with self.as_user(user):
+ result = self._delete_library(self.lib_id, expect_response=status.HTTP_200_OK)
+ assert result == {}
diff --git a/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py b/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py
index 6ba1049082d1..3b36df3f4075 100644
--- a/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py
+++ b/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py
@@ -514,12 +514,12 @@ def test_create_taxonomy(self, user_attr: str, expected_status: int) -> None:
@ddt.data(
('staff', 11),
- ("content_creatorA", 17),
- ("library_staffA", 17),
- ("library_userA", 17),
- ("instructorA", 17),
- ("course_instructorA", 17),
- ("course_staffA", 17),
+ ("content_creatorA", 22),
+ ("library_staffA", 22),
+ ("library_userA", 22),
+ ("instructorA", 22),
+ ("course_instructorA", 22),
+ ("course_staffA", 22),
)
@ddt.unpack
def test_list_taxonomy_query_count(self, user_attr: str, expected_queries: int):
@@ -1927,16 +1927,16 @@ def test_get_copied_tags(self):
('staff', 'courseA', 8),
('staff', 'libraryA', 8),
('staff', 'collection_key', 8),
- ("content_creatorA", 'courseA', 12, False),
- ("content_creatorA", 'libraryA', 12, False),
- ("content_creatorA", 'collection_key', 12, False),
- ("library_staffA", 'libraryA', 12, False), # Library users can only view objecttags, not change them?
- ("library_staffA", 'collection_key', 12, False),
- ("library_userA", 'libraryA', 12, False),
- ("library_userA", 'collection_key', 12, False),
- ("instructorA", 'courseA', 12),
- ("course_instructorA", 'courseA', 12),
- ("course_staffA", 'courseA', 12),
+ ("content_creatorA", 'courseA', 17, False),
+ ("content_creatorA", 'libraryA', 17, False),
+ ("content_creatorA", 'collection_key', 17, False),
+ ("library_staffA", 'libraryA', 17, False), # Library users can only view objecttags, not change them?
+ ("library_staffA", 'collection_key', 17, False),
+ ("library_userA", 'libraryA', 17, False),
+ ("library_userA", 'collection_key', 17, False),
+ ("instructorA", 'courseA', 17),
+ ("course_instructorA", 'courseA', 17),
+ ("course_staffA", 'courseA', 17),
)
@ddt.unpack
def test_object_tags_query_count(
diff --git a/requirements/common_constraints.txt b/requirements/common_constraints.txt
index c13c406e6176..1f3e81f50334 100644
--- a/requirements/common_constraints.txt
+++ b/requirements/common_constraints.txt
@@ -16,7 +16,7 @@
# this file from Github directly. It does not require packaging in edx-lint.
# using LTS django version
-
+Django<6.0
# elasticsearch>=7.14.0 includes breaking changes in it which caused issues in discovery upgrade process.
# elastic search changelog: https://www.elastic.co/guide/en/enterprise-search/master/release-notes-7.14.0.html
diff --git a/requirements/edx/base.txt b/requirements/edx/base.txt
index f4fa71e700fb..f3387eb7bb18 100644
--- a/requirements/edx/base.txt
+++ b/requirements/edx/base.txt
@@ -172,6 +172,7 @@ defusedxml==0.7.1
# social-auth-core
django==5.2.7
# via
+ # -c requirements/common_constraints.txt
# -c requirements/constraints.txt
# -r requirements/edx/kernel.in
# casbin-django-orm-adapter
@@ -825,7 +826,7 @@ openedx-atlas==0.7.0
# enterprise-integrated-channels
# openedx-authz
# openedx-forum
-openedx-authz==0.11.2
+openedx-authz==0.19.0
# via -r requirements/edx/kernel.in
openedx-calc==4.0.2
# via -r requirements/edx/kernel.in
diff --git a/requirements/edx/development.txt b/requirements/edx/development.txt
index 3afded2e283b..6e75615ec812 100644
--- a/requirements/edx/development.txt
+++ b/requirements/edx/development.txt
@@ -345,6 +345,7 @@ distlib==0.4.0
# virtualenv
django==5.2.7
# via
+ # -c requirements/common_constraints.txt
# -c requirements/constraints.txt
# -r requirements/edx/doc.txt
# -r requirements/edx/testing.txt
@@ -1375,7 +1376,7 @@ openedx-atlas==0.7.0
# enterprise-integrated-channels
# openedx-authz
# openedx-forum
-openedx-authz==0.11.2
+openedx-authz==0.19.0
# via
# -r requirements/edx/doc.txt
# -r requirements/edx/testing.txt
diff --git a/requirements/edx/doc.txt b/requirements/edx/doc.txt
index e25735f3961c..5e465e8905ba 100644
--- a/requirements/edx/doc.txt
+++ b/requirements/edx/doc.txt
@@ -234,6 +234,7 @@ defusedxml==0.7.1
# social-auth-core
django==5.2.7
# via
+ # -c requirements/common_constraints.txt
# -c requirements/constraints.txt
# -r requirements/edx/base.txt
# casbin-django-orm-adapter
@@ -1002,7 +1003,7 @@ openedx-atlas==0.7.0
# enterprise-integrated-channels
# openedx-authz
# openedx-forum
-openedx-authz==0.11.2
+openedx-authz==0.19.0
# via -r requirements/edx/base.txt
openedx-calc==4.0.2
# via -r requirements/edx/base.txt
diff --git a/requirements/edx/testing.txt b/requirements/edx/testing.txt
index d0faa2471265..08f7639ace60 100644
--- a/requirements/edx/testing.txt
+++ b/requirements/edx/testing.txt
@@ -261,6 +261,7 @@ distlib==0.4.0
# via virtualenv
django==5.2.7
# via
+ # -c requirements/common_constraints.txt
# -c requirements/constraints.txt
# -r requirements/edx/base.txt
# casbin-django-orm-adapter
@@ -1048,7 +1049,7 @@ openedx-atlas==0.7.0
# enterprise-integrated-channels
# openedx-authz
# openedx-forum
-openedx-authz==0.11.2
+openedx-authz==0.19.0
# via -r requirements/edx/base.txt
openedx-calc==4.0.2
# via -r requirements/edx/base.txt
diff --git a/scripts/user_retirement/requirements/base.txt b/scripts/user_retirement/requirements/base.txt
index a14836ff5e95..b726b8a49f40 100644
--- a/scripts/user_retirement/requirements/base.txt
+++ b/scripts/user_retirement/requirements/base.txt
@@ -36,6 +36,7 @@ cryptography==45.0.7
# pyjwt
django==5.2.7
# via
+ # -c requirements/common_constraints.txt
# -c requirements/constraints.txt
# django-crum
# django-waffle