Skip to content

Commit

Permalink
Merge pull request #4 from qld-gov-au/develop
Browse files Browse the repository at this point in the history
Develop to master
  • Loading branch information
ThrawnCA authored Oct 14, 2024
2 parents 3a31603 + 33b0d36 commit 779caa9
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 21 deletions.
21 changes: 21 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[flake8]
# @see https://flake8.pycqa.org/en/latest/user/configuration.html?highlight=.flake8

exclude =
ckan

# Extended output format.
format = pylint

# Show the source of errors.
show_source = True
statistics = True

max-complexity = 10
max-line-length = 127

# List ignore rules one per line.
ignore =
C901
E501
W503
15 changes: 13 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
name: Tests
on: [push, pull_request]
on:
push:
pull_request:
branches:
- master

jobs:
test:
strategy:
Expand Down Expand Up @@ -34,7 +39,13 @@ jobs:
CKAN_REDIS_URL: redis://redis:6379/1

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Install lint requirements
run: pip install flake8 pycodestyle
- name: Check syntax
run: flake8

- name: Install requirements
# Install any extra requirements your extension has here (dev requirements, other extensions etc)
run: |
Expand Down
28 changes: 22 additions & 6 deletions ckanext/oidc_pkce/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
CONFIG_REDIRECT_PATH = "ckanext.oidc_pkce.redirect_path"
DEFAULT_REDIRECT_PATH = "/user/login/oidc-pkce/callback"

CONFIG_LOGOUT_PATH = "ckanext.oidc_pkce.logout_path"
DEFAULT_LOGOUT_PATH = ""

CONFIG_ERROR_REDIRECT = "ckanext.oidc_pkce.error_redirect"
DEFAULT_ERROR_REDIRECT = None

Expand Down Expand Up @@ -80,17 +83,17 @@ def auth_url() -> str:


def token_path() -> str:
"""Path(without base URL) where authorization token can be retrived."""
"""Path (without base URL) where authorization token can be retrieved."""
return tk.config.get(CONFIG_TOKEN_PATH, DEFAULT_TOKEN_PATH)


def token_url() -> str:
"""SSO URL where authorization token can be retrived."""
"""SSO URL where authorization token can be retrieved."""
return base_url() + token_path()


def redirect_path() -> str:
"""Path(without base URL) that handles authentication response."""
"""Path (without base URL) that handles authentication response."""

return tk.config.get(CONFIG_REDIRECT_PATH, DEFAULT_REDIRECT_PATH)

Expand All @@ -101,15 +104,28 @@ def redirect_url() -> str:


def userinfo_path() -> str:
"""Path(without base URL) where user info can be retrived."""
"""Path (without base URL) where user info can be retrieved."""
return tk.config.get(CONFIG_USERINFO_PATH, DEFAULT_USERINFO_PATH)


def userinfo_url() -> str:
"""SSO URL where user info can be retrived."""
"""SSO URL where user info can be retrieved."""
return base_url() + userinfo_path()


def logout_path() -> str:
"""Path (without base URL) that handles logout."""
return tk.config.get(CONFIG_LOGOUT_PATH) or DEFAULT_LOGOUT_PATH


def logout_url() -> str:
"""CKAN URL that handles authentication response."""
url = logout_path()
if url:
url = base_url() + url
return url


def error_redirect() -> Optional[str]:
"""Destination for redirect after the failed login attempt."""
return tk.config.get(CONFIG_ERROR_REDIRECT, DEFAULT_ERROR_REDIRECT)
Expand All @@ -131,5 +147,5 @@ def munge_password() -> bool:


def scope() -> str:
"""Scope of the user info retrived from SSO application"""
"""Scope of the user info retrieved from SSO application"""
return tk.config.get(CONFIG_SCOPE, DEFAULT_SCOPE)
15 changes: 11 additions & 4 deletions ckanext/oidc_pkce/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,30 @@ groups:
default: /oauth2/default/v1/authorize
example: /auth
description: |
Path to the authorization endpont inside SSO application
Path to the authorization endpoint inside SSO application.
- key: ckanext.oidc_pkce.token_path
default: /oauth2/default/v1/token
example: /token
description: Path to the token endpont inside SSO application
description: Path to the token endpoint inside SSO application.

- key: ckanext.oidc_pkce.userinfo_path
default: /oauth2/default/v1/userinfo
example: /userinfo
description: Path to the userinfo endpont inside SSO application
description: Path to the userinfo endpoint inside SSO application.

- key: ckanext.oidc_pkce.logout_path
default: null
example: /logout
description: |
Path to the logout endpoint inside SSO application.
If not provided, logouts will not be propagated to SSO.
- key: ckanext.oidc_pkce.redirect_path
default: /user/login/oidc-pkce/callback
example: /local/oidc/handler
description: |
Path to the authentication response handler inside CKAN application
Path to the authentication response handler inside CKAN application.
- key: ckanext.oidc_pkce.error_redirect
default: null
Expand Down
16 changes: 13 additions & 3 deletions ckanext/oidc_pkce/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# encoding: utf-8

from __future__ import annotations

import logging
import secrets
from typing import Any, Optional

Expand All @@ -10,6 +13,8 @@

from . import config, signals

log = logging.getLogger(__name__)


class IOidcPkce(Interface):
""" """
Expand All @@ -26,10 +31,15 @@ def get_oidc_user(self, userinfo: dict[str, Any]) -> Optional[model.User]:
signals.user_exist.send(user.id)
return user

user = q.filter(
users = q.filter(
model.User.email.ilike(userinfo["email"])
).one_or_none()
if user:
).all()
if len(users) > 1:
log.error("Unable to uniquely identify account, found %s matches for: %s",
len(users), userinfo["email"])
return None
elif users:
user = users[0]
admin = tk.get_action("get_site_user")({"ignore_auth": True}, {})
user_dict = tk.get_action("user_show")(
{"user": admin["name"]},
Expand Down
63 changes: 59 additions & 4 deletions ckanext/oidc_pkce/plugin.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from __future__ import annotations

import logging
from typing import Optional

from flask import redirect
from flask.wrappers import Response

import ckan.plugins as p
import ckan.plugins.toolkit as tk
from ckan import model
from ckan.common import session
from ckan.views import user as user_view

from . import helpers, interfaces, utils, views
from . import config, helpers, interfaces, utils, views

log = logging.getLogger(__name__)

try:
config_declarations = tk.blanket.config_declarations
Expand All @@ -18,26 +23,76 @@ def config_declarations(cls):
return cls


def _current_user():
if tk.check_ckan_version('2.10'):
from ckan.common import current_user
return current_user
return tk.g.userobj


@config_declarations
class OidcPkcePlugin(p.SingletonPlugin):
p.implements(p.IBlueprint)
p.implements(p.IConfigurer)
p.implements(p.ITemplateHelpers)
p.implements(p.IAuthenticator, inherit=True)
p.implements(interfaces.IOidcPkce, inherit=True)

# IBlueprint
def get_blueprint(self):
return views.get_blueprints()

# IConfigurer

def update_config(self, config_):
tk.add_template_directory(config_, 'templates')

# ITemplateHelpers
def get_helpers(
self,
):
return helpers.get_helpers()

if not tk.check_ckan_version("2.10"):
p.implements(p.IAuthenticator, inherit=True)
# IAuthenticator

if tk.check_ckan_version("2.10"):

def logout(self):
""" We want to return a view after the regular logout logic,
rather than before.
We set a flag to indicate that we're in the middle of logout,
then call the regular logout view. The view calls a second
instance of this function, which detects the flag and no-ops,
allowing the view to proceed and wipe the session.
After it completes, we assemble a redirect and pass that back
to the code that originally called this function.
"""
if session.pop("_in_logout", False):
log.debug("SSO logout found in-progress flag, skipping recursive call")
return None
current_user = _current_user()
if not current_user.is_authenticated:
log.info("No current user found, skipping SSO logout")
return None
plugin_extras = getattr(current_user, 'plugin_extras', None)
if not plugin_extras or not plugin_extras.get('oidc_pkce'):
log.info("Current user [%s] is not associated with SSO, skipping SSO logout",
current_user.name)
return None

# IAuthenticator
log.info("Logging out [%s]", current_user.name)
sso_logout_url = config.logout_url()
if not sso_logout_url:
log.info("No SSO logout path configured, logout of [%s] will be local only",
current_user.name)
return None
session["_in_logout"] = True
original_response = user_view.logout()
log.debug("Redirecting [%s] to SSO logout: %s", current_user.name, sso_logout_url)
return redirect(sso_logout_url + '?redirect_uri=' + original_response.location)
else:
def identify(self) -> Optional[Response]:
user = model.User.get(session.get(utils.SESSION_USER))
if user:
Expand Down
14 changes: 14 additions & 0 deletions ckanext/oidc_pkce/templates/user/login.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% ckan_extends %}
{#
Adds a link from the login form to the single sign-on provider.
#}

{% block form %}
<p><a href="{{ h.url_for('user.login') }}/oidc-pkce" class="btn btn-primary"><i class="fa fa-sign-in"></i> Log in via single sign-on</a></p>
{{ super() }}
{% endblock %}

{% block help_register_button %}
{{ super() }}
<a href="{{ h.url_for('user.login') }}/oidc-pkce" class="btn btn-secondary">Register via single sign-on</a>
{% endblock %}
1 change: 1 addition & 0 deletions ckanext/oidc_pkce/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from ckan.tests.factories import User


@register(_name="user_info")
class UserInfoFactory(factory.DictFactory):
sub = factory.Faker("uuid4")
Expand Down
65 changes: 65 additions & 0 deletions ckanext/oidc_pkce/tests/test_plugin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,74 @@
# encoding: utf-8

from flask import redirect

from unittest.mock import MagicMock
import pytest

import ckan.plugins as p

from ckanext.oidc_pkce import plugin as plugin_module


class MockUser(object):
""" Stub class to represent a logged-in user.
"""

def __init__(self, name, oidc_enabled=False):
""" Set up a stub name to return.
"""
self.name = name
self.is_authenticated = True if name else False
if oidc_enabled:
self.plugin_extras = {'oidc_pkce': {
'sub': 1, 'name': name, 'email': 'test@localhost'
}}


@pytest.mark.ckan_config("ckan.plugins", "oidc_pkce")
@pytest.mark.usefixtures("with_plugins")
def test_plugin():
assert p.plugin_loaded("oidc_pkce")


if p.toolkit.check_ckan_version('2.10'):

@pytest.mark.ckan_config("ckanext.oidc_pkce.base_url", "http://unit-test-sso")
@pytest.mark.ckan_config("ckanext.oidc_pkce.logout_path", "/logout")
def test_logout_flow():
plugin_module.user_view.logout = MagicMock()
plugin_module.user_view.logout.return_value = redirect("http://unit-test-ckan/logged_out")
plugin_module.tk = MagicMock()
plugin = plugin_module.OidcPkcePlugin()
plugin_module.session = {}

# no-op due to not being logged in
plugin_module._current_user = lambda: MockUser(None)
assert plugin.logout() is None
plugin_module.user_view.logout.assert_not_called()

# no-op due to not being SSO-enabled
plugin_module._current_user = lambda: MockUser('test')
assert plugin.logout() is None
plugin_module.user_view.logout.assert_not_called()

# no-op due to flag in session
plugin_module.session['_in_logout'] = True
plugin_module._current_user = lambda: MockUser('test', True)
assert plugin.logout() is None
plugin_module.user_view.logout.assert_not_called()
assert plugin_module.session == {}

# call core view and issue redirect
result = plugin.logout()
assert result.location == 'http://unit-test-sso/logout?redirect_uri=http://unit-test-ckan/logged_out'

@pytest.mark.ckan_config("ckanext.oidc_pkce.base_url", "http://unit-test")
def test_logout_disabled():
plugin_module.user_view = MagicMock()
plugin = plugin_module.OidcPkcePlugin()
plugin_module._current_user = lambda: MockUser('test', True)

# no-op due to config not having a logout path
assert plugin.logout() is None
plugin_module.user_view.logout.assert_not_called()
2 changes: 1 addition & 1 deletion ckanext/oidc_pkce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def sync_user(userinfo: dict[str, Any]) -> Optional[model.User]:

user = plugin.get_oidc_user(userinfo)
if not user:
log.error("Cannot locate user/create using OIDC info: %s", userinfo)
log.error("Cannot locate or create unique user using OIDC info: %s", userinfo)
return

return user
Expand Down
Loading

0 comments on commit 779caa9

Please sign in to comment.