Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ omit =
*admin.py
*/static/*
*/templates/*
*/tests/*
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ def on_init(app): # pylint: disable=unused-argument
docs_path,
os.path.join(root_path, "openedx_authz"),
os.path.join(root_path, "openedx_authz/migrations"),
os.path.join(root_path, "openedx_authz/tests"),
]
)

Expand Down
6 changes: 5 additions & 1 deletion openedx_authz/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""
One-line description for README and other doc files.
Open edX AuthZ provides the architecture and foundations of the authorization framework.
"""

import os

__version__ = "0.1.0"

ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
1 change: 1 addition & 0 deletions openedx_authz/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class OpenedxAuthzConfig(AppConfig):
"""

name = "openedx_authz"
plugin_app = {}
11 changes: 11 additions & 0 deletions openedx_authz/engine/config/authz.policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ===== ACTION GROUPING (g2) =====

# manage implies edit, delete, read, write
g2, act:manage, act:edit
g2, act:manage, act:delete
g2, act:edit, act:read
g2, act:edit, act:write

# edit implies read, write
g2, act:edit, act:read
g2, act:edit, act:write
92 changes: 92 additions & 0 deletions openedx_authz/engine/config/model.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
############################################
# Open edX AuthZ — Casbin Model Configuration
#
# This model supports:
# - Scoped role assignments (user roles tied to specific contexts)
# - Action grouping (manage → read/write/edit/delete to reduce duplication)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between write and edit here? Is write just create + edit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this example can also be seen like this:

# manage implies edit and delete
["g2", "act:manage", "act:edit"],
["g2", "act:manage", "act:delete"],
# edit implies read and write
["g2", "act:edit", "act:read"],
["g2", "act:edit", "act:write"],

by inheritance manage would include all permissions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thank you, I think I missed that there could be multiple levels of inheritance

# - System-wide roles (global scope "*" applies everywhere)
# - Negative rules (deny overrides allow for exceptions)
# - Namespace support (course:*, lib:*, org:*, etc.)
# - Extensibility (new resource types just need new namespaces)
############################################

[request_definition]
# Request format: subject (user), action, scope (specific resource being accessed)
#
# sub = subject/principal with namespace (e.g., "user:alice", "service:lms")
# act = action with namespace (e.g., "act:read", "act:manage", "act:edit-courses")
# scope = authorization scope context (e.g., "org:OpenedX", "course-v1:...", "*" for global)
#
# SCOPE SEMANTICS:
# Scope determines the authorization context and which role assignments apply
# - "*" = global scope (system-wide roles apply everywhere)
# - "org:..." = organization-scoped roles (apply within specific organization)
# - "course-v1:..." = course-scoped roles (apply within specific course)
# - "lib:..." = library-scoped roles (apply within specific library)
#
# Application must provide appropriate scope based on business logic.
r = sub, act, scope

[policy_definition]
# Policy format: subject (role), action, scope (pattern), effect
#
# sub = role or user with namespace (e.g., "role:org_admin", "user:bob")
# act = action identifier (e.g., "act:manage", "act:read", "act:edit-courses")
# scope = scope where policy applies (e.g., "*", "org:*", "course-v1:*", "lib:*")
# eft = "allow" or "deny" (deny overrides allow for exceptions)
p = sub, act, scope, eft

[role_definition]
# g: Role assignments with scope
# Format: user/subject, role, scope
#
# Examples:
# g, user:alice, role:org_admin, org:OpenedX # Alice is org admin for OpenedX
# g, user:bob, role:course_instructor, course-v1:... # Bob is instructor for specific course
# g, user:carol, role:library_admin, * # Carol is global library admin
#
# Role hierarchy (optional):
# g, role:org_admin, role:org_editor, org:OpenedX # org_admin inherits org_editor permissions
g = _, _, _

# g2: Action grouping and implications
# Maps high-level actions to specific actions to reduce policy duplication
#
# Examples:
# g2, act:manage, act:edit # manage implies edit
# g2, act:manage, act:delete # manage implies delete
# g2, act:edit-courses, act:read # edit-courses implies read (for resource access)
# g2, act:edit-courses, act:write # edit-courses implies write (for resource modification)
g2 = _, _

[policy_effect]
# Deny-override policy: allow if any rule allows AND no rule denies
# This enables negative rules/exceptions (e.g., "manage all courses except course Z")
#
# Evaluation order:
# 1. Check if any policy grants allow
# 2. Check if any policy specifies deny
# 3. If deny found, result is deny (exceptions win)
# 4. If allow found and no deny, result is allow
# 5. If no matches, result is deny (default secure)
e = some(where (p.eft == allow)) && !some(where (p.eft == deny))

[matchers]
# Authorization matching logic
#
# ROLE MATCHING:
# - g(r.sub, p.sub, r.scope): check if subject has role in requested scope
# - g(r.sub, p.sub, "*"): check if subject has role in all resources in the scope
#
# SCOPE MATCHING:
# - keyMatch(r.scope, p.scope): scope matches pattern
#
# ACTION MATCHING:
# - r.act == p.act: exact action match
# - g2(p.act, r.act): policy action implies requested action via grouping
#
# All conditions must be true for a policy to match:
# 1. Subject must have role in scope OR global role
# 2. Scope must match pattern
# 3. Action must match OR inherit via action grouping
m = (g(r.sub, p.sub, r.scope) || g(r.sub, p.sub, "*")) && keyMatch(r.scope, p.scope) && (r.act == p.act || g2(p.act, r.act))
Empty file.
Empty file.
191 changes: 191 additions & 0 deletions openedx_authz/management/commands/enforcement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""
Django management command for interactive Casbin enforcement testing.

This command creates a Casbin enforcer using the model.conf configuration and a
user-specified policy file, then provides an interactive mode for testing
authorization enforcement requests.

The command supports:
- Loading Casbin model from the built-in model.conf file or a custom file (specified via --model-file-path argument)
- Using custom policy files (specified via --policy-file-path argument)
- Interactive testing with format: subject action scope
- Real-time enforcement results with visual feedback (✓ ALLOWED / ✗ DENIED)
- Display of loaded policies, role assignments, and action grouping rules

Example usage:
python manage.py enforcement --policy-file-path /path/to/authz.policy

python manage.py enforcement --policy-file-path /path/to/authz.policy --model-file-path /path/to/model.conf

Example test input:
user:alice act:read org:OpenedX
"""

import argparse
import os

import casbin
from django.core.management.base import BaseCommand, CommandError

from openedx_authz import ROOT_DIRECTORY


class Command(BaseCommand):
"""
Django management command for interactive Casbin enforcement testing.

This command loads a Casbin model configuration and user-specified policy file
to create an enforcer instance, then provides an interactive shell for testing
authorization requests in real-time with immediate feedback.
"""

help = (
"Interactive mode for testing Casbin enforcement policies using a custom model file and"
"a custom policy file. Provides real-time authorization testing with format: subject action scope. "
"Use --policy-file-path to specify the policy file location. "
"Use --model-file-path to specify the model file location. "
)

def add_arguments(self, parser: argparse.ArgumentParser) -> None:
"""Add command-line arguments to the argument parser.

Args:
parser (argparse.ArgumentParser): The Django argument parser instance to configure.
"""
parser.add_argument(
"--policy-file-path",
type=str,
required=True,
help="Path to the Casbin policy file (supports CSV format with policies, roles, and action grouping)",
)
parser.add_argument(
"--model-file-path",
type=str,
required=False,
help="Path to the Casbin model file. If not provided, the default model.conf file will be used.",
)

def handle(self, *args, **options):
"""Execute the enforcement testing command.

Loads the Casbin model and policy files, creates an enforcer instance,
displays configuration summary, and starts the interactive testing mode.

Args:
*args: Positional command arguments (unused).
**options: Command options including `policy_file_path` and `model_file_path`.

Raises:
CommandError: If model or policy files are not found or enforcer creation fails.
"""
model_file_path = self._get_file_path("model.conf") or options["model_file_path"]
policy_file_path = options["policy_file_path"]

if not os.path.isfile(model_file_path):
raise CommandError(f"Model file not found: {model_file_path}")
if not os.path.isfile(policy_file_path):
raise CommandError(f"Policy file not found: {policy_file_path}")

self.stdout.write(self.style.SUCCESS("Casbin Interactive Enforcement"))
self.stdout.write(f"Model file path: {model_file_path}")
self.stdout.write(f"Policy file path: {policy_file_path}")
self.stdout.write("")

try:
enforcer = casbin.Enforcer(model_file_path, policy_file_path)
self.stdout.write(self.style.SUCCESS("Casbin enforcer created successfully"))

policies = enforcer.get_policy()
roles = enforcer.get_grouping_policy()
action_grouping = enforcer.get_named_grouping_policy("g2")

self.stdout.write(f"✓ Loaded {len(policies)} policies")
self.stdout.write(f"✓ Loaded {len(roles)} role assignments")
self.stdout.write(f"✓ Loaded {len(action_grouping)} action grouping rules")
self.stdout.write("")

self._run_interactive_mode(enforcer)

except Exception as e:
raise CommandError(f"Error creating Casbin enforcer: {str(e)}") from e

def _get_file_path(self, file_name: str) -> str:
"""Construct the full file path for a configuration file.

Args:
file_name (str): The name of the configuration file (e.g., 'model.conf').

Returns:
str: The absolute path to the configuration file in the engine/config directory.
"""
return os.path.join(ROOT_DIRECTORY, "engine", "config", file_name)

def _run_interactive_mode(self, enforcer: casbin.Enforcer) -> None:
"""Start the interactive enforcement testing shell.

Provides a continuous loop where users can input enforcement requests
in the format 'subject action scope' and receive immediate
authorization results with visual feedback.

Args:
enforcer (casbin.Enforcer): The configured Casbin enforcer instance for testing.

Note:
Exit the interactive mode with Ctrl+C or Ctrl+D.
"""
self.stdout.write(self.style.SUCCESS("Interactive Mode"))
self.stdout.write("Test custom enforcement requests interactively.")
self.stdout.write("Enter 'quit', 'exit', or 'q' to exit the interactive mode.")
self.stdout.write("")
self.stdout.write("Format: subject action scope")
self.stdout.write("Example: user:alice act:read org:OpenedX")
self.stdout.write("")

while True:
try:
user_input = input("Enter enforcement test: ").strip()

if not user_input:
continue

if user_input.lower() in ["quit", "exit", "q"]:
break

self._test_interactive_request(enforcer, user_input)
except (KeyboardInterrupt, EOFError):
self.stdout.write(self.style.ERROR("Exiting interactive mode..."))
break

def _test_interactive_request(self, enforcer: casbin.Enforcer, user_input: str) -> None:
"""Process and test a single enforcement request from user input.

Parses the input string, validates the format, executes the enforcement
check, and displays the result with appropriate styling.

Args:
enforcer (casbin.Enforcer): The Casbin enforcer instance to use for testing.
user_input (str): The user's input string in format 'subject action scope'.

Expected format:
subject: The requesting entity (e.g., 'user:alice')
action: The requested action (e.g., 'act:read')
scope: The authorization context (e.g., 'org:OpenedX')
"""
try:
parts = [part.strip() for part in user_input.split()]
if len(parts) != 3:
self.stdout.write(self.style.ERROR(f"✗ Invalid format. Expected 3 parts, got {len(parts)}"))
self.stdout.write("Format: subject action scope")
self.stdout.write("Example: user:alice act:read org:OpenedX")
return

subject, action, scope = parts
result = enforcer.enforce(subject, action, scope)

if result:
self.stdout.write(self.style.SUCCESS(f"✓ ALLOWED: {subject} {action} {scope}"))
else:
self.stdout.write(self.style.ERROR(f"✗ DENIED: {subject} {action} {scope}"))

except (ValueError, IndexError, TypeError) as e:
self.stdout.write(self.style.ERROR(f"✗ Error processing request: {str(e)}"))
5 changes: 5 additions & 0 deletions openedx_authz/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Tests package for openedx_authz.

This package contains all tests for the openedx_authz application.
"""
Loading