Skip to content

Commit

Permalink
Merge pull request #851 from RJPercival/next_url_in_relay_state
Browse files Browse the repository at this point in the history
Pass "next" URL through SAML RelayState
  • Loading branch information
digismack authored Nov 10, 2023
2 parents b9d9937 + b631796 commit fcc6894
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 13 deletions.
2 changes: 1 addition & 1 deletion social_core/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def do_complete(backend, login, user=None, redirect_name="next", *args, **kwargs
# clean partial data after usage
backend.strategy.clean_partial_pipeline(partial.token)
else:
user = backend.complete(user=user, *args, **kwargs)
user = backend.complete(user=user, redirect_name=redirect_name, *args, **kwargs)

# pop redirect value before the session is trashed on login(), but after
# the pipeline so that the pipeline can change the redirect if needed
Expand Down
28 changes: 25 additions & 3 deletions social_core/backends/saml.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"Identity Provider" (IdP): The third-party site that is authenticating
users via SAML
"""
import json

from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.settings import OneLogin_Saml2_Settings

Expand Down Expand Up @@ -278,8 +280,12 @@ def auth_url(self):
# Below, return_to sets the RelayState, which can contain
# arbitrary data. We use it to store the specific SAML IdP
# name, since we multiple IdPs share the same auth_complete
# URL.
return auth.login(return_to=idp_name)
# URL, and the URL to redirect to after auth completes.
relay_state = {
"idp": idp_name,
"next": self.data.get("next"),
}
return auth.login(return_to=json.dumps(relay_state))

def get_user_details(self, response):
"""Get user details like full name, email, etc. from the
Expand All @@ -303,10 +309,26 @@ def auth_complete(self, *args, **kwargs):
now log them in, if everything checks out.
"""
try:
idp_name = self.strategy.request_data()["RelayState"]
relay_state_str = self.strategy.request_data()["RelayState"]
except KeyError:
raise AuthMissingParameter(self, "RelayState")

try:
relay_state = json.loads(relay_state_str)
if not isinstance(relay_state, dict) or "idp" not in relay_state:
raise ValueError(
"RelayState is expected to contain a JSON object with an 'idp' key"
)
except ValueError:
# Assume RelayState is just the idp_name, as it used to be in previous versions of this code.
# This ensures compatibility with previous versions.
idp_name = relay_state_str
else:
idp_name = relay_state["idp"]
if next_url := relay_state.get("next"):
# The do_complete action expects the "next" URL to be in session state or the request params.
self.strategy.session_set(kwargs.get("redirect_name", "next"), next_url)

idp = self.get_idp(idp_name)
auth = self._create_saml_auth(idp)
auth.process_response()
Expand Down
5 changes: 4 additions & 1 deletion social_core/tests/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BaseActionTest(unittest.TestCase):

def __init__(self, *args, **kwargs):
self.strategy = None
self.backend = None
super().__init__(*args, **kwargs)

def setUp(self):
Expand All @@ -63,7 +64,9 @@ def setUp(self):
TestAssociation.reset_cache()
Backend = module_member("social_core.backends.github.GithubOAuth2")
self.strategy = self.strategy or TestStrategy(TestStorage)
self.backend = Backend(self.strategy, redirect_uri="/complete/github")
self.backend = self.backend or Backend(
self.strategy, redirect_uri="/complete/github"
)
self.user = None

def tearDown(self):
Expand Down
33 changes: 32 additions & 1 deletion social_core/tests/actions/test_login.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
from ...backends.base import BaseAuth
from ...utils import PARTIAL_TOKEN_SESSION_NAME
from ..models import User
from ..models import TestUserSocialAuth, User
from .actions import BaseActionTest


class BackendThatControlsRedirect(BaseAuth):
"""
A fake backend that sets the URL to redirect to after login.
It is not always possible to set the redirect URL in the session state prior to auth and then retrieve it when
auth is complete, because the session cookie might not be available post-auth. For example, for SAML, a POST request
redirects the user from the IdP (Identity Provider) back to the SP (Service Provider) to complete the auth process,
but the session cookie will not be present if the session cookie's `SameSite` attribute is not set to "None".
To mitigate this, SAML provides a `RelayState` parameter to pass data like a redirect URL from the SP to the IdP
and back again. In that case, the redirect URL is only known in `auth_complete`, and must be communicated back to
the `do_complete` action via session state so that it can issue the intended redirect.
"""

ACCESS_TOKEN_URL = "https://example.com/oauth/access_token"

def auth_url(self):
return "https://example.com/oauth/auth?state=foo"

def auth_complete(self, *args, **kwargs):
# Put the redirect URL in the session state, as this is where the `do_complete` action looks for it.
self.strategy.session_set(kwargs["redirect_name"], "/after-login")
return kwargs["user"]


class LoginActionTest(BaseActionTest):
def test_login(self):
self.do_login()
Expand Down Expand Up @@ -37,6 +62,12 @@ def test_redirect_value(self):
redirect = self.do_login(after_complete_checks=False)
self.assertEqual(redirect.url, "/after-login")

def test_redirect_value_set_by_backend(self):
self.backend = BackendThatControlsRedirect(self.strategy)
self.user = TestUserSocialAuth.create_user("test-user")
redirect = self.do_login(after_complete_checks=False)
self.assertEqual(redirect.url, "/after-login")

def test_login_with_invalid_partial_pipeline(self):
def before_complete():
partial_token = self.strategy.session_get(PARTIAL_TOKEN_SESSION_NAME)
Expand Down
2 changes: 1 addition & 1 deletion social_core/tests/backends/data/saml_response.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions social_core/tests/backends/data/saml_response_legacy.txt

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

54 changes: 48 additions & 6 deletions social_core/tests/backends/test_saml.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class SAMLTest(BaseBackendTest):
backend_path = "social_core.backends.saml.SAMLAuth"
expected_username = "myself"
response_fixture = "saml_response.txt"

def extra_settings(self):
name = path.join(DATA_DIR, "saml_config.json")
Expand All @@ -58,7 +59,7 @@ def install_http_intercepts(self, start_url, return_url):
# we will eventually get a redirect back, with SAML assertion
# data in the query string. A pre-recorded correct response
# is kept in this .txt file:
name = path.join(DATA_DIR, "saml_response.txt")
name = path.join(DATA_DIR, self.response_fixture)
with open(name) as response_file:
response_url = response_file.read()
HTTPretty.register_uri(
Expand Down Expand Up @@ -91,14 +92,55 @@ def test_metadata_generation(self):
self.assertEqual(len(errors), 0)
self.assertEqual(xml.decode()[0], "<")

def test_login(self):
"""Test that we can authenticate with a SAML IdP (TestShib)"""
# pretend we've started with a URL like /login/saml/?idp=testshib:
def test_login_with_next_url(self):
"""
Test that we login and then redirect to the "next" URL.
"""
# pretend we've started with a URL like /login/saml/?idp=testshib&next=/foo/bar
self.strategy.set_request_data(
{"idp": "testshib", "next": "/foo/bar"}, self.backend
)
self.do_login()
# The core `do_complete` action assumes the "next" URL is stored in session state or the request data.
self.assertEqual(self.strategy.session_get("next"), "/foo/bar")

def test_login_no_next_url(self):
"""
Test that we handle "next" being omitted from the request data and RelayState.
"""
self.response_fixture = "saml_response_no_next_url.txt"

# pretend we've started with a URL like /login/saml/?idp=testshib
self.strategy.set_request_data({"idp": "testshib"}, self.backend)
self.do_login()
self.assertEqual(self.strategy.session_get("next"), None)

def test_login_with_legacy_relay_state(self):
"""
Test that we handle legacy RelayState (i.e. just the IDP name, not a JSON object).
This is the form that RelayState had in prior versions of this library. It should be supported for backwards
compatibility.
"""
self.response_fixture = "saml_response_legacy.txt"

self.strategy.set_request_data({"idp": "testshib"}, self.backend)
self.do_login()

def test_login_no_idp_in_initial_request(self):
"""
Logging in without an idp param should raise AuthMissingParameter
"""
with self.assertRaises(AuthMissingParameter):
self.do_start()

def test_login_no_idp_in_saml_response(self):
"""
The RelayState should always contain a JSON object with an "idp" key, or be just the IDP name as a string.
This tests that an exception is raised if it is a JSON object, but is missing the "idp" key.
"""
self.response_fixture = "saml_response_no_idp_name.txt"

def test_login_no_idp(self):
"""Logging in without an idp param should raise AuthMissingParameter"""
with self.assertRaises(AuthMissingParameter):
self.do_start()

Expand Down

0 comments on commit fcc6894

Please sign in to comment.