diff --git a/.coveragerc b/.coveragerc index c467500f..13fb7fbb 100644 --- a/.coveragerc +++ b/.coveragerc @@ -8,3 +8,4 @@ omit = *admin.py */static/* */templates/* + */tests/* diff --git a/docs/conf.py b/docs/conf.py index 725a1e2f..ab818ce6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -559,6 +559,7 @@ def on_init(app): # pylint: disable=unused-argument docs_path, os.path.join(root_path, "openedx_authz"), os.path.join(root_path, "openedx_authz/migrations"), + os.path.join(root_path, "openedx_authz/tests"), ] ) diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index 15602551..4d6d6d3a 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -1,5 +1,9 @@ """ -One-line description for README and other doc files. +Open edX AuthZ provides the architecture and foundations of the authorization framework. """ +import os + __version__ = "0.1.0" + +ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) diff --git a/openedx_authz/apps.py b/openedx_authz/apps.py index eb3e05eb..208599c9 100644 --- a/openedx_authz/apps.py +++ b/openedx_authz/apps.py @@ -11,3 +11,4 @@ class OpenedxAuthzConfig(AppConfig): """ name = "openedx_authz" + plugin_app = {} diff --git a/openedx_authz/engine/config/authz.policy b/openedx_authz/engine/config/authz.policy new file mode 100644 index 00000000..2bae77ed --- /dev/null +++ b/openedx_authz/engine/config/authz.policy @@ -0,0 +1,11 @@ +# ===== ACTION GROUPING (g2) ===== + +# manage implies edit, delete, read, write +g2, act:manage, act:edit +g2, act:manage, act:delete +g2, act:edit, act:read +g2, act:edit, act:write + +# edit implies read, write +g2, act:edit, act:read +g2, act:edit, act:write diff --git a/openedx_authz/engine/config/model.conf b/openedx_authz/engine/config/model.conf new file mode 100644 index 00000000..e3e2ae9c --- /dev/null +++ b/openedx_authz/engine/config/model.conf @@ -0,0 +1,92 @@ +############################################ +# Open edX AuthZ — Casbin Model Configuration +# +# This model supports: +# - Scoped role assignments (user roles tied to specific contexts) +# - Action grouping (manage → read/write/edit/delete to reduce duplication) +# - System-wide roles (global scope "*" applies everywhere) +# - Negative rules (deny overrides allow for exceptions) +# - Namespace support (course:*, lib:*, org:*, etc.) +# - Extensibility (new resource types just need new namespaces) +############################################ + +[request_definition] +# Request format: subject (user), action, scope (specific resource being accessed) +# +# sub = subject/principal with namespace (e.g., "user:alice", "service:lms") +# act = action with namespace (e.g., "act:read", "act:manage", "act:edit-courses") +# scope = authorization scope context (e.g., "org:OpenedX", "course-v1:...", "*" for global) +# +# SCOPE SEMANTICS: +# Scope determines the authorization context and which role assignments apply +# - "*" = global scope (system-wide roles apply everywhere) +# - "org:..." = organization-scoped roles (apply within specific organization) +# - "course-v1:..." = course-scoped roles (apply within specific course) +# - "lib:..." = library-scoped roles (apply within specific library) +# +# Application must provide appropriate scope based on business logic. +r = sub, act, scope + +[policy_definition] +# Policy format: subject (role), action, scope (pattern), effect +# +# sub = role or user with namespace (e.g., "role:org_admin", "user:bob") +# act = action identifier (e.g., "act:manage", "act:read", "act:edit-courses") +# scope = scope where policy applies (e.g., "*", "org:*", "course-v1:*", "lib:*") +# eft = "allow" or "deny" (deny overrides allow for exceptions) +p = sub, act, scope, eft + +[role_definition] +# g: Role assignments with scope +# Format: user/subject, role, scope +# +# Examples: +# g, user:alice, role:org_admin, org:OpenedX # Alice is org admin for OpenedX +# g, user:bob, role:course_instructor, course-v1:... # Bob is instructor for specific course +# g, user:carol, role:library_admin, * # Carol is global library admin +# +# Role hierarchy (optional): +# g, role:org_admin, role:org_editor, org:OpenedX # org_admin inherits org_editor permissions +g = _, _, _ + +# g2: Action grouping and implications +# Maps high-level actions to specific actions to reduce policy duplication +# +# Examples: +# g2, act:manage, act:edit # manage implies edit +# g2, act:manage, act:delete # manage implies delete +# g2, act:edit-courses, act:read # edit-courses implies read (for resource access) +# g2, act:edit-courses, act:write # edit-courses implies write (for resource modification) +g2 = _, _ + +[policy_effect] +# Deny-override policy: allow if any rule allows AND no rule denies +# This enables negative rules/exceptions (e.g., "manage all courses except course Z") +# +# Evaluation order: +# 1. Check if any policy grants allow +# 2. Check if any policy specifies deny +# 3. If deny found, result is deny (exceptions win) +# 4. If allow found and no deny, result is allow +# 5. If no matches, result is deny (default secure) +e = some(where (p.eft == allow)) && !some(where (p.eft == deny)) + +[matchers] +# Authorization matching logic +# +# ROLE MATCHING: +# - g(r.sub, p.sub, r.scope): check if subject has role in requested scope +# - g(r.sub, p.sub, "*"): check if subject has role in all resources in the scope +# +# SCOPE MATCHING: +# - keyMatch(r.scope, p.scope): scope matches pattern +# +# ACTION MATCHING: +# - r.act == p.act: exact action match +# - g2(p.act, r.act): policy action implies requested action via grouping +# +# All conditions must be true for a policy to match: +# 1. Subject must have role in scope OR global role +# 2. Scope must match pattern +# 3. Action must match OR inherit via action grouping +m = (g(r.sub, p.sub, r.scope) || g(r.sub, p.sub, "*")) && keyMatch(r.scope, p.scope) && (r.act == p.act || g2(p.act, r.act)) diff --git a/openedx_authz/management/__init__.py b/openedx_authz/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openedx_authz/management/commands/__init__.py b/openedx_authz/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/openedx_authz/management/commands/enforcement.py b/openedx_authz/management/commands/enforcement.py new file mode 100644 index 00000000..32719f29 --- /dev/null +++ b/openedx_authz/management/commands/enforcement.py @@ -0,0 +1,191 @@ +""" +Django management command for interactive Casbin enforcement testing. + +This command creates a Casbin enforcer using the model.conf configuration and a +user-specified policy file, then provides an interactive mode for testing +authorization enforcement requests. + +The command supports: +- Loading Casbin model from the built-in model.conf file or a custom file (specified via --model-file-path argument) +- Using custom policy files (specified via --policy-file-path argument) +- Interactive testing with format: subject action scope +- Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED) +- Display of loaded policies, role assignments, and action grouping rules + +Example usage: + python manage.py enforcement --policy-file-path /path/to/authz.policy + + python manage.py enforcement --policy-file-path /path/to/authz.policy --model-file-path /path/to/model.conf + +Example test input: + user:alice act:read org:OpenedX +""" + +import argparse +import os + +import casbin +from django.core.management.base import BaseCommand, CommandError + +from openedx_authz import ROOT_DIRECTORY + + +class Command(BaseCommand): + """ + Django management command for interactive Casbin enforcement testing. + + This command loads a Casbin model configuration and user-specified policy file + to create an enforcer instance, then provides an interactive shell for testing + authorization requests in real-time with immediate feedback. + """ + + help = ( + "Interactive mode for testing Casbin enforcement policies using a custom model file and" + "a custom policy file. Provides real-time authorization testing with format: subject action scope. " + "Use --policy-file-path to specify the policy file location. " + "Use --model-file-path to specify the model file location. " + ) + + def add_arguments(self, parser: argparse.ArgumentParser) -> None: + """Add command-line arguments to the argument parser. + + Args: + parser (argparse.ArgumentParser): The Django argument parser instance to configure. + """ + parser.add_argument( + "--policy-file-path", + type=str, + required=True, + help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)", + ) + parser.add_argument( + "--model-file-path", + type=str, + required=False, + help="Path to the Casbin model file. If not provided, the default model.conf file will be used.", + ) + + def handle(self, *args, **options): + """Execute the enforcement testing command. + + Loads the Casbin model and policy files, creates an enforcer instance, + displays configuration summary, and starts the interactive testing mode. + + Args: + *args: Positional command arguments (unused). + **options: Command options including `policy_file_path` and `model_file_path`. + + Raises: + CommandError: If model or policy files are not found or enforcer creation fails. + """ + model_file_path = self._get_file_path("model.conf") or options["model_file_path"] + policy_file_path = options["policy_file_path"] + + if not os.path.isfile(model_file_path): + raise CommandError(f"Model file not found: {model_file_path}") + if not os.path.isfile(policy_file_path): + raise CommandError(f"Policy file not found: {policy_file_path}") + + self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement")) + self.stdout.write(f"Model file path: {model_file_path}") + self.stdout.write(f"Policy file path: {policy_file_path}") + self.stdout.write("") + + try: + enforcer = casbin.Enforcer(model_file_path, policy_file_path) + self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully")) + + policies = enforcer.get_policy() + roles = enforcer.get_grouping_policy() + action_grouping = enforcer.get_named_grouping_policy("g2") + + self.stdout.write(f"✓ Loaded {len(policies)} policies") + self.stdout.write(f"✓ Loaded {len(roles)} role assignments") + self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules") + self.stdout.write("") + + self._run_interactive_mode(enforcer) + + except Exception as e: + raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e + + def _get_file_path(self, file_name: str) -> str: + """Construct the full file path for a configuration file. + + Args: + file_name (str): The name of the configuration file (e.g., 'model.conf'). + + Returns: + str: The absolute path to the configuration file in the engine/config directory. + """ + return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name) + + def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None: + """Start the interactive enforcement testing shell. + + Provides a continuous loop where users can input enforcement requests + in the format 'subject action scope' and receive immediate + authorization results with visual feedback. + + Args: + enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing. + + Note: + Exit the interactive mode with Ctrl+C or Ctrl+D. + """ + self.stdout.write(self.style.SUCCESS("Interactive Mode")) + self.stdout.write("Test custom enforcement requests interactively.") + self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.") + self.stdout.write("") + self.stdout.write("Format: subject action scope") + self.stdout.write("Example: user:alice act:read org:OpenedX") + self.stdout.write("") + + while True: + try: + user_input = input("Enter enforcement test: ").strip() + + if not user_input: + continue + + if user_input.lower() in ["quit", "exit", "q"]: + break + + self._test_interactive_request(enforcer, user_input) + except (KeyboardInterrupt, EOFError): + self.stdout.write(self.style.ERROR("Exiting interactive mode...")) + break + + def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None: + """Process and test a single enforcement request from user input. + + Parses the input string, validates the format, executes the enforcement + check, and displays the result with appropriate styling. + + Args: + enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing. + user_input (str): The user's input string in format 'subject action scope'. + + Expected format: + subject: The requesting entity (e.g., 'user:alice') + action: The requested action (e.g., 'act:read') + scope: The authorization context (e.g., 'org:OpenedX') + """ + try: + parts = [part.strip() for part in user_input.split()] + if len(parts) != 3: + self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}")) + self.stdout.write("Format: subject action scope") + self.stdout.write("Example: user:alice act:read org:OpenedX") + return + + subject, action, scope = parts + result = enforcer.enforce(subject, action, scope) + + if result: + self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}")) + else: + self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}")) + + except (ValueError, IndexError, TypeError) as e: + self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}")) diff --git a/openedx_authz/tests/__init__.py b/openedx_authz/tests/__init__.py new file mode 100644 index 00000000..478e545a --- /dev/null +++ b/openedx_authz/tests/__init__.py @@ -0,0 +1,5 @@ +""" +Tests package for openedx_authz. + +This package contains all tests for the openedx_authz application. +""" diff --git a/openedx_authz/tests/test_commands.py b/openedx_authz/tests/test_commands.py new file mode 100644 index 00000000..0a371480 --- /dev/null +++ b/openedx_authz/tests/test_commands.py @@ -0,0 +1,193 @@ +""" +Tests for the `enforcement` Django management command. +""" + +import io +from tempfile import TemporaryFile +from unittest import TestCase +from unittest.mock import Mock, patch + +from ddt import data, ddt +from django.core.management import call_command +from django.core.management.base import CommandError + +from openedx_authz.management.commands.enforcement import Command as EnforcementCommand + + +# pylint: disable=protected-access +@ddt +class EnforcementCommandTests(TestCase): + """ + Tests for the `enforcement` Django management command. + + This test class verifies the behavior of the enforcement command, including: + - Argument validation and error handling + - File existence checks for policy and model files + - Enforcer initialization and error scenarios + - Interactive mode functionality + - Command output and user feedback + """ + + def setUp(self): + super().setUp() + self.buffer = io.StringIO() + self.policy_file_path = TemporaryFile() + self.command = EnforcementCommand() + self.command.stdout = self.buffer + self.enforcer = Mock() + + def test_requires_policy_file_argument(self): + """Test that calling the command without --policy-file-path should error from argparse.""" + with self.assertRaises(CommandError) as ctx: + call_command("enforcement") + + self.assertEqual("Error: the following arguments are required: --policy-file-path", str(ctx.exception)) + + def test_policy_file_not_found_raises(self): + """Test that command errors when the provided policy file does not exist.""" + non_existent = "invalid/path/does-not-exist.policy" + + with self.assertRaises(CommandError) as ctx: + call_command("enforcement", policy_file_path=non_existent) + + self.assertEqual(f"Policy file not found: {non_existent}", str(ctx.exception)) + + @patch.object(EnforcementCommand, "_get_file_path", return_value="invalid/path/model.conf") + def test_model_file_not_found_raises(self, mock_get_file_path: Mock): + """Test that command errors when the provided model file does not exist.""" + with self.assertRaises(CommandError) as ctx: + call_command("enforcement", policy_file_path=self.policy_file_path.name) + + self.assertEqual(f"Model file not found: {mock_get_file_path.return_value}", str(ctx.exception)) + + @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") + def test_error_creating_enforcer_raises(self, mock_enforcer_cls: Mock): + """Test that command errors when the enforcer creation fails.""" + mock_enforcer_cls.side_effect = Exception("Enforcer creation error") + + with self.assertRaises(CommandError) as ctx: + call_command("enforcement", policy_file_path=self.policy_file_path.name) + + self.assertEqual("Error creating Casbin enforcer: Enforcer creation error", str(ctx.exception)) + + @patch("openedx_authz.management.commands.enforcement.casbin.Enforcer") + @patch.object(EnforcementCommand, "_run_interactive_mode") + def test_successful_run_prints_summary(self, mock_run_interactive: Mock, mock_enforcer_cls: Mock): + """ + Test successful command execution with policy file and interactive mode. + When files exist, command should create enforcer, print counts, and call interactive loop. + """ + mock_enforcer = Mock() + policies = [["p", "role:platform_admin", "act:manage", "*", "allow"]] + roles = [["g", "user:user-1", "role:platform_admin", "*"]] + action_grouping = [ + ["g2", "act:edit", "act:read"], + ["g2", "act:edit", "act:write"], + ] + mock_enforcer.get_policy.return_value = policies + mock_enforcer.get_grouping_policy.return_value = roles + mock_enforcer.get_named_grouping_policy.return_value = action_grouping + mock_enforcer_cls.return_value = mock_enforcer + + call_command("enforcement", policy_file_path=self.policy_file_path.name, stdout=self.buffer) + + output = self.buffer.getvalue() + self.assertIn("Casbin Interactive Enforcement", output) + self.assertIn("Casbin enforcer created successfully", output) + self.assertIn(f"✓ Loaded {len(policies)} policies", output) + self.assertIn(f"✓ Loaded {len(roles)} role assignments", output) + self.assertIn(f"✓ Loaded {len(action_grouping)} action grouping rules", output) + mock_run_interactive.assert_called_once_with(mock_enforcer) + + def test_run_interactive_mode_displays_help(self): + """Test that the interactive mode runs.""" + with patch("builtins.input", side_effect=["quit"]): + self.command._run_interactive_mode(self.enforcer) + + self.assertIn("Interactive Mode", self.buffer.getvalue()) + self.assertIn("Test custom enforcement requests interactively.", self.buffer.getvalue()) + self.assertIn("Enter 'quit', 'exit', or 'q' to exit the interactive mode.", self.buffer.getvalue()) + self.assertIn("Format: subject action scope", self.buffer.getvalue()) + self.assertIn("Example: user:alice act:read org:OpenedX", self.buffer.getvalue()) + + def test_run_interactive_mode_maintains_interactive_loop(self): + """Test that the interactive mode maintains the interactive loop.""" + input_values = ["", "", "", "quit"] + + with patch("builtins.input", side_effect=input_values) as mock_input: + self.command._run_interactive_mode(self.enforcer) + + self.assertEqual(mock_input.call_count, len(input_values)) + + @data( + ["user:alice act:read org:OpenedX"], + ["user:bob act:read org:OpenedX"] * 5, + ["user:john act:read org:OpenedX"] * 10, + ) + def test_run_interactive_mode_processes_request(self, user_input: list[str]): + """Test that the interactive mode processes the request.""" + with patch("builtins.input", side_effect=user_input + ["quit"]) as mock_input: + with patch.object(self.command, "_test_interactive_request") as mock_method: + self.command._run_interactive_mode(self.enforcer) + + self.assertEqual(mock_input.call_count, len(user_input) + 1) + self.assertEqual(mock_method.call_count, len(user_input)) + for value in user_input: + mock_method.assert_any_call(self.enforcer, value) + + @data("quit", "exit", "q", "QUIT", "EXIT", "Q") + def test_quit_commands_case_insensitive(self, quit_command: str): + """Test that all quit commands work regardless of case.""" + with patch("builtins.input", side_effect=[quit_command]) as mock_input: + self.command._run_interactive_mode(self.enforcer) + + self.assertEqual(mock_input.call_count, 1) + + @data(KeyboardInterrupt(), EOFError()) + def test_handles_exceptions(self, exception: Exception): + """Test that interactive mode handles exceptions gracefully.""" + with patch("builtins.input", side_effect=exception): + self.command._run_interactive_mode(self.enforcer) + + self.assertIn("Exiting interactive mode...", self.buffer.getvalue()) + + def test_interactive_request_allowed(self): + """Test that `_test_interactive_request` prints allowed output format.""" + self.enforcer.enforce.return_value = True + user_input = "user:alice act:read org:OpenedX" + + self.command._test_interactive_request(self.enforcer, user_input) + + allowed_output = self.buffer.getvalue() + self.assertIn(f"✓ ALLOWED: {user_input}", allowed_output) + + def test_interactive_request_denied(self): + """Test that `_test_interactive_request` prints denied output format.""" + self.enforcer.enforce.return_value = False + user_input = "user:alice act:delete org:OpenedX" + + self.command._test_interactive_request(self.enforcer, user_input) + + denied_output = self.buffer.getvalue() + self.assertIn(f"✗ DENIED: {user_input}", denied_output) + + def test_interactive_request_invalid_format(self): + """Test that `_test_interactive_request` reports invalid input format.""" + user_input = "user:alice act:read" + + self.command._test_interactive_request(self.enforcer, user_input) + + invalid_output = self.buffer.getvalue() + self.assertIn("✗ Invalid format. Expected 3 parts, got 2", invalid_output) + self.assertIn("Format: subject action scope", invalid_output) + self.assertIn(f"Example: {user_input} org:OpenedX", invalid_output) + + @data(ValueError(), IndexError(), TypeError()) + def test_interactive_request_error(self, exception: Exception): + """Test that `_test_interactive_request` handles processing errors.""" + self.enforcer.enforce.side_effect = exception + + self.command._test_interactive_request(self.enforcer, "user:alice act:read org:OpenedX") + + error_output = self.buffer.getvalue() + self.assertIn(f"✗ Error processing request: {str(exception)}", error_output) diff --git a/openedx_authz/tests/test_enforcement.py b/openedx_authz/tests/test_enforcement.py new file mode 100644 index 00000000..6f30f62b --- /dev/null +++ b/openedx_authz/tests/test_enforcement.py @@ -0,0 +1,442 @@ +""" +Comprehensive test suite for Open edX authorization enforcement using Casbin. + +This module validates the authorization system implemented with Casbin, testing +various aspects of the permission model. +""" + +import os +from typing import TypedDict +from unittest import TestCase + +import casbin +from ddt import data, ddt, unpack + +from openedx_authz import ROOT_DIRECTORY + + +class AuthRequest(TypedDict): + """ + Represents an authorization request with all necessary parameters. + """ + + subject: str + action: str + scope: str + expected_result: bool + + +COMMON_ACTION_GROUPING = [ + # manage implies edit and delete + ["g2", "act:manage", "act:edit"], + ["g2", "act:manage", "act:delete"], + # edit implies read and write + ["g2", "act:edit", "act:read"], + ["g2", "act:edit", "act:write"], +] + + +@ddt +class CasbinEnforcementTestCase(TestCase): + """ + Test case for Casbin enforcement policies. + + This test class loads the model.conf and the provided policies and runs + enforcement tests for different user roles and permissions. + """ + + @classmethod + def setUpClass(cls) -> None: + """Set up the Casbin enforcer.""" + super().setUpClass() + + engine_config_dir = os.path.join(ROOT_DIRECTORY, "engine", "config") + model_file = os.path.join(engine_config_dir, "model.conf") + + if not os.path.isfile(model_file): + raise FileNotFoundError(f"Model file not found: {model_file}") + + cls.enforcer = casbin.Enforcer(model_file) + + def _load_policy(self, policy: list[str]) -> None: + """ + Load policy rules into the Casbin enforcer. + + This method clears any existing policies and loads the provided policy rules + into the appropriate policy stores (p for policies, g for role assignments, + g2 for action groupings). + + Args: + policy (list[str]): List of policy rules where each rule is a + list starting with the rule type ('p', 'g', or 'g2') followed by + the rule parameters. + + Raises: + ValueError: If a policy rule has an invalid type (not 'p', 'g', or 'g2'). + """ + self.enforcer.clear_policy() + for rule in policy: + if rule[0] == "p": + self.enforcer.add_named_policy("p", rule[1:]) + elif rule[0] == "g": + self.enforcer.add_named_grouping_policy("g", rule[1:]) + elif rule[0] == "g2": + self.enforcer.add_named_grouping_policy("g2", rule[1:]) + else: + raise ValueError(f"Invalid policy rule: {rule}") + + def _test_enforcement(self, policy: list[str], request: AuthRequest) -> None: + """ + Helper method to test enforcement and provide detailed feedback. + + Args: + policy (list[str]): A list of policy rules to load into the enforcer + request (AuthRequest): An authorization request containing all necessary parameters + """ + self._load_policy(policy) + subject, action, scope = request["subject"], request["action"], request["scope"] + result = self.enforcer.enforce(subject, action, scope) + error_msg = f"Request: {subject} {action} {scope}" + self.assertEqual(result, request["expected_result"], error_msg) + + +@ddt +class SystemWideRoleTests(CasbinEnforcementTestCase): + """ + Tests for system-wide roles with global access permissions. + + This test class verifies that users assigned to system-wide roles (with global scope "*") + can access resources across all scopes and namespaces. Platform administrators should + have unrestricted access to manage any resource in the system, regardless of the + specific scope (organization, course, library, etc.). + """ + + POLICY = [ + ["p", "role:platform_admin", "act:manage", "*", "allow"], + ["g", "user:user-1", "role:platform_admin", "*"], + ] + COMMON_ACTION_GROUPING + + GENERAL_CASES = [ + { + "subject": "user:user-1", + "action": "act:manage", + "scope": "*", + "expected_result": True, + }, + { + "subject": "user:user-1", + "action": "act:manage", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-1", + "action": "act:manage", + "scope": "course:course-v1:any-org+any-course+any-course-run", + "expected_result": True, + }, + { + "subject": "user:user-1", + "action": "act:manage", + "scope": "lib:lib:any-org:any-library", + "expected_result": True, + }, + ] + + @data(*GENERAL_CASES) + def test_platform_admin_general_access(self, request: AuthRequest): + """Test that platform administrators have full access to all resources.""" + self._test_enforcement(self.POLICY, request) + + +@ddt +class ActionGroupingTests(CasbinEnforcementTestCase): + """ + Tests for action grouping and permission inheritance. + + This test class verifies that action grouping works correctly, where high-level + actions (like 'manage') automatically grant access to lower-level actions + (like 'edit', 'read', 'write', 'delete') through the g2 grouping mechanism. + """ + + POLICY = [ + ["p", "role:role-1", "act:manage", "org:*", "allow"], + ["g", "user:user-1", "role:role-1", "org:any-org"], + ] + COMMON_ACTION_GROUPING + + CASES = [ + { + "subject": "user:user-1", + "action": "act:edit", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-1", + "action": "act:read", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-1", + "action": "act:write", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-1", + "action": "act:delete", + "scope": "org:any-org", + "expected_result": True, + }, + ] + + @data(*CASES) + def test_action_grouping_access(self, request: AuthRequest): + """Test that users have access through action grouping.""" + self._test_enforcement(self.POLICY, request) + + +@ddt +class RoleAssignmentTests(CasbinEnforcementTestCase): + """ + Tests for role assignment and scoped authorization. + + This test class verifies that users with different roles can access resources + within their assigned scopes. + """ + + POLICY = [ + # Policies + ["p", "role:platform_admin", "act:manage", "*", "allow"], + ["p", "role:org_admin", "act:manage", "org:*", "allow"], + ["p", "role:org_editor", "act:edit", "org:*", "allow"], + ["p", "role:org_author", "act:write", "org:*", "allow"], + ["p", "role:course_admin", "act:manage", "course:*", "allow"], + ["p", "role:library_admin", "act:manage", "lib:*", "allow"], + ["p", "role:library_editor", "act:edit", "lib:*", "allow"], + ["p", "role:library_reviewer", "act:read", "lib:*", "allow"], + ["p", "role:library_author", "act:write", "lib:*", "allow"], + # Role assignments + ["g", "user:user-1", "role:platform_admin", "*"], + ["g", "user:user-2", "role:org_admin", "org:any-org"], + ["g", "user:user-3", "role:org_editor", "org:any-org"], + ["g", "user:user-4", "role:org_author", "org:any-org"], + ["g", "user:user-5", "role:course_admin", "course:course-v1:any-org+any-course+any-course-run"], + ["g", "user:user-6", "role:library_admin", "lib:lib:any-org:any-library"], + ["g", "user:user-7", "role:library_editor", "lib:lib:any-org:any-library"], + ["g", "user:user-8", "role:library_reviewer", "lib:lib:any-org:any-library"], + ["g", "user:user-9", "role:library_author", "lib:lib:any-org:any-library"], + ] + COMMON_ACTION_GROUPING + + CASES = [ + { + "subject": "user:user-1", + "action": "act:manage", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-2", + "action": "act:manage", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-3", + "action": "act:edit", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-4", + "action": "act:write", + "scope": "org:any-org", + "expected_result": True, + }, + { + "subject": "user:user-5", + "action": "act:manage", + "scope": "course:course-v1:any-org+any-course+any-course-run", + "expected_result": True, + }, + { + "subject": "user:user-6", + "action": "act:manage", + "scope": "lib:lib:any-org:any-library", + "expected_result": True, + }, + { + "subject": "user:user-7", + "action": "act:edit", + "scope": "lib:lib:any-org:any-library", + "expected_result": True, + }, + { + "subject": "user:user-8", + "action": "act:read", + "scope": "lib:lib:any-org:any-library", + "expected_result": True, + }, + { + "subject": "user:user-9", + "action": "act:write", + "scope": "lib:lib:any-org:any-library", + "expected_result": True, + }, + ] + + @data(*CASES) + def test_role_assignment_access(self, request: AuthRequest): + """Test that users have access through role assignment.""" + self._test_enforcement(self.POLICY, request) + + +@ddt +class DeniedAccessTests(CasbinEnforcementTestCase): + """Tests for denied access scenarios. + + This test class verifies that the authorization system correctly denies access + when explicit deny rules override allow rules. + """ + + POLICY = [ + ["p", "role:platform_admin", "act:manage", "*", "allow"], + ["p", "role:platform_admin", "act:manage", "org:restricted-org", "deny"], + ["g", "user:user-1", "role:platform_admin", "*"], + ] + COMMON_ACTION_GROUPING + + CASES = [ + { + "subject": "user:user-1", + "action": "act:manage", + "scope": "org:allowed-org", + "expected_result": True, + }, + { + "subject": "user:user-1", + "action": "act:manage", + "scope": "org:restricted-org", + "expected_result": False, + }, + { + "subject": "user:user-1", + "action": "act:edit", + "scope": "org:restricted-org", + "expected_result": False, + }, + { + "subject": "user:user-1", + "action": "act:read", + "scope": "org:restricted-org", + "expected_result": False, + }, + { + "subject": "user:user-1", + "action": "act:write", + "scope": "org:restricted-org", + "expected_result": False, + }, + { + "subject": "user:user-1", + "action": "act:delete", + "scope": "org:restricted-org", + "expected_result": False, + }, + ] + + @data(*CASES) + def test_denied_access(self, request: AuthRequest): + """Test that users have denied access.""" + self._test_enforcement(self.POLICY, request) + + +@ddt +class WildcardScopeTests(CasbinEnforcementTestCase): + """Tests for wildcard scope authorization patterns. + + Verifies that users with roles assigned to wildcard scopes (like "*" for global access + or "org:*" for organization-wide access) can properly access resources within their + authorized scope boundaries. + """ + + POLICY = [ + # Policies + ["p", "role:platform_admin", "act:manage", "*", "allow"], + ["p", "role:org_admin", "act:manage", "org:*", "allow"], + ["p", "role:course_admin", "act:manage", "course:*", "allow"], + ["p", "role:library_admin", "act:manage", "lib:*", "allow"], + # Role assignments + ["g", "user:user-1", "role:platform_admin", "*"], + ["g", "user:user-2", "role:org_admin", "*"], + ["g", "user:user-3", "role:course_admin", "*"], + ["g", "user:user-4", "role:library_admin", "*"], + ] + COMMON_ACTION_GROUPING + + @data( + ("*", True), + ("org:MIT", True), + ("course:course-v1:OpenedX+DemoX+CS101", True), + ("lib:lib:OpenedX:math-basics", True), + ) + @unpack + def test_wildcard_global_access(self, scope: str, expected_result: bool): + """Test that users have access through wildcard global scope.""" + request = { + "subject": "user:user-1", + "action": "act:manage", + "scope": scope, + "expected_result": expected_result, + } + self._test_enforcement(self.POLICY, request) + + @data( + ("*", False), + ("org:MIT", True), + ("course:course-v1:OpenedX+DemoX+CS101", False), + ("lib:lib:OpenedX:math-basics", False), + ) + @unpack + def test_wildcard_org_access(self, scope: str, expected_result: bool): + """Test that users have access through wildcard org scope.""" + request = { + "subject": "user:user-2", + "action": "act:manage", + "scope": scope, + "expected_result": expected_result, + } + self._test_enforcement(self.POLICY, request) + + @data( + ("*", False), + ("org:MIT", False), + ("course:course-v1:OpenedX+DemoX+CS101", True), + ("lib:lib:OpenedX:math-basics", False), + ) + @unpack + def test_wildcard_course_access(self, scope: str, expected_result: bool): + """Test that users have access through wildcard course scope.""" + request = { + "subject": "user:user-3", + "action": "act:manage", + "scope": scope, + "expected_result": expected_result, + } + self._test_enforcement(self.POLICY, request) + + @data( + ("*", False), + ("org:MIT", False), + ("course:course-v1:OpenedX+DemoX+CS101", False), + ("lib:lib:OpenedX:math-basics", True), + ) + @unpack + def test_wildcard_library_access(self, scope: str, expected_result: bool): + """Test that users have access through wildcard library scope.""" + request = { + "subject": "user:user-4", + "action": "act:manage", + "scope": scope, + "expected_result": expected_result, + } + self._test_enforcement(self.POLICY, request) diff --git a/requirements/base.in b/requirements/base.in index 9f4002ee..ad8b6ced 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -5,3 +5,4 @@ Django # Web application framework openedx-atlas +pycasbin # Authorization library for implementing access control models diff --git a/requirements/base.txt b/requirements/base.txt index d7690f80..0d23b15c 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -12,5 +12,9 @@ django==4.2.23 # -r requirements/base.in openedx-atlas==0.7.0 # via -r requirements/base.in +pycasbin==2.2.0 + # via -r requirements/base.in +simpleeval==1.0.3 + # via pycasbin sqlparse==0.5.3 # via django diff --git a/requirements/dev.txt b/requirements/dev.txt index 47bf74ab..58806021 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -50,6 +50,8 @@ coverage[toml]==7.10.5 # via # -r requirements/quality.txt # pytest-cov +ddt==1.7.2 + # via -r requirements/quality.txt diff-cover==9.6.0 # via -r requirements/dev.in dill==0.4.0 @@ -133,6 +135,8 @@ pluggy==1.6.0 # tox polib==1.2.0 # via edx-i18n-tools +pycasbin==2.2.0 + # via -r requirements/quality.txt pycodestyle==2.14.0 # via -r requirements/quality.txt pydocstyle==6.3.0 @@ -189,6 +193,10 @@ pyyaml==6.0.2 # -r requirements/quality.txt # code-annotations # edx-i18n-tools +simpleeval==1.0.3 + # via + # -r requirements/quality.txt + # pycasbin six==1.17.0 # via # -r requirements/quality.txt diff --git a/requirements/doc.txt b/requirements/doc.txt index a16c0732..ff1044df 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -40,6 +40,8 @@ coverage[toml]==7.10.5 # pytest-cov cryptography==45.0.6 # via secretstorage +ddt==1.7.2 + # via -r requirements/test.txt django==4.2.23 # via # -c https:/raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt @@ -111,6 +113,8 @@ pluggy==1.6.0 # -r requirements/test.txt # pytest # pytest-cov +pycasbin==2.2.0 + # via -r requirements/test.txt pycparser==2.22 # via cffi pydata-sphinx-theme==0.15.4 @@ -164,6 +168,10 @@ roman-numerals-py==3.1.0 # via sphinx secretstorage==3.3.3 # via keyring +simpleeval==1.0.3 + # via + # -r requirements/test.txt + # pycasbin snowballstemmer==3.0.1 # via sphinx soupsieve==2.8 diff --git a/requirements/quality.txt b/requirements/quality.txt index 0340e588..1f0574f3 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -28,6 +28,8 @@ coverage[toml]==7.10.5 # via # -r requirements/test.txt # pytest-cov +ddt==1.7.2 + # via -r requirements/test.txt dill==0.4.0 # via pylint django==4.2.23 @@ -67,6 +69,8 @@ pluggy==1.6.0 # -r requirements/test.txt # pytest # pytest-cov +pycasbin==2.2.0 + # via -r requirements/test.txt pycodestyle==2.14.0 # via -r requirements/quality.in pydocstyle==6.3.0 @@ -106,6 +110,10 @@ pyyaml==6.0.2 # via # -r requirements/test.txt # code-annotations +simpleeval==1.0.3 + # via + # -r requirements/test.txt + # pycasbin six==1.17.0 # via edx-lint snowballstemmer==3.0.1 diff --git a/requirements/test.in b/requirements/test.in index 6797160b..276dfd90 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -6,3 +6,4 @@ pytest-cov # pytest extension for code coverage statistics pytest-django # pytest extension for better Django support code-annotations # provides commands used by the pii_check make target. +ddt # data-driven tests diff --git a/requirements/test.txt b/requirements/test.txt index 9cf395ae..d0f0f363 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -14,6 +14,8 @@ code-annotations==2.3.0 # via -r requirements/test.in coverage[toml]==7.10.5 # via pytest-cov +ddt==1.7.2 + # via -r requirements/test.in # via # -c https:/raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/base.txt @@ -31,6 +33,8 @@ pluggy==1.6.0 # via # pytest # pytest-cov +pycasbin==2.2.0 + # via -r requirements/base.txt pygments==2.19.2 # via pytest pytest==8.4.1 @@ -45,6 +49,10 @@ python-slugify==8.0.4 # via code-annotations pyyaml==6.0.2 # via code-annotations +simpleeval==1.0.3 + # via + # -r requirements/base.txt + # pycasbin sqlparse==0.5.3 # via # -r requirements/base.txt diff --git a/setup.py b/setup.py index b0f66915..217bb067 100755 --- a/setup.py +++ b/setup.py @@ -159,4 +159,12 @@ def is_requirement(line): "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ], + entry_points={ + "lms.djangoapp": [ + "openedx_authz = openedx_authz.apps:OpenedxAuthzConfig", + ], + "cms.djangoapp": [ + "openedx_authz = openedx_authz.apps:OpenedxAuthzConfig", + ], + }, )