diff --git a/cms/envs/test.py b/cms/envs/test.py index 6a7c17b001fe..500c8d538d39 100644 --- a/cms/envs/test.py +++ b/cms/envs/test.py @@ -26,7 +26,8 @@ from .common import * # import settings from LMS for consistent behavior with CMS -from lms.envs.test import ( # pylint: disable=wrong-import-order +from lms.envs.test import ( # pylint: disable=wrong-import-order, disable=unused-import + ACCOUNT_MICROFRONTEND_URL, COMPREHENSIVE_THEME_DIRS, # unimport:skip DEFAULT_FILE_STORAGE, ECOMMERCE_API_URL, @@ -35,8 +36,10 @@ LOGIN_ISSUE_SUPPORT_LINK, MEDIA_ROOT, MEDIA_URL, + ORDER_HISTORY_MICROFRONTEND_URL, PLATFORM_DESCRIPTION, PLATFORM_NAME, + PROFILE_MICROFRONTEND_URL, REGISTRATION_EXTRA_FIELDS, GRADES_DOWNLOAD, SITE_NAME, @@ -51,28 +54,26 @@ STUDIO_SHORT_NAME = gettext_lazy("𝓢𝓽𝓾𝓭𝓲𝓸") # Allow all hosts during tests, we use a lot of different ones all over the codebase. -ALLOWED_HOSTS = [ - '*' -] +ALLOWED_HOSTS = ["*"] # mongo connection settings -MONGO_PORT_NUM = int(os.environ.get('EDXAPP_TEST_MONGO_PORT', '27017')) -MONGO_HOST = os.environ.get('EDXAPP_TEST_MONGO_HOST', 'localhost') +MONGO_PORT_NUM = int(os.environ.get("EDXAPP_TEST_MONGO_PORT", "27017")) +MONGO_HOST = os.environ.get("EDXAPP_TEST_MONGO_HOST", "localhost") THIS_UUID = uuid4().hex[:5] -TEST_ROOT = path('test_root') +TEST_ROOT = path("test_root") # Want static files in the same dir for running on jenkins. STATIC_ROOT = TEST_ROOT / "staticfiles" -WEBPACK_LOADER['DEFAULT']['STATS_FILE'] = STATIC_ROOT / "webpack-stats.json" +WEBPACK_LOADER["DEFAULT"]["STATS_FILE"] = STATIC_ROOT / "webpack-stats.json" GITHUB_REPO_ROOT = TEST_ROOT / "data" DATA_DIR = TEST_ROOT / "data" COMMON_TEST_DATA_ROOT = COMMON_ROOT / "test" / "data" # For testing "push to lms" -FEATURES['ENABLE_EXPORT_GIT'] = True +FEATURES["ENABLE_EXPORT_GIT"] = True GIT_REPO_EXPORT_DIR = TEST_ROOT / "export_course_repos" # TODO (cpennington): We need to figure out how envs/test.py can inject things into common.py so that we don't have to repeat this sort of thing # lint-amnesty, pylint: disable=line-too-long @@ -90,51 +91,47 @@ # If we don't add these settings, then Django templates that can't # find pipelined assets will raise a ValueError. # http://stackoverflow.com/questions/12816941/unit-testing-with-django-pipeline -STATICFILES_STORAGE = 'pipeline.storage.NonPackagingPipelineStorage' +STATICFILES_STORAGE = "pipeline.storage.NonPackagingPipelineStorage" STATIC_URL = "/static/" # Update module store settings per defaults for tests update_module_store_settings( MODULESTORE, module_store_options={ - 'default_class': 'xmodule.hidden_block.HiddenBlock', - 'fs_root': TEST_ROOT / "data", + "default_class": "xmodule.hidden_block.HiddenBlock", + "fs_root": TEST_ROOT / "data", }, doc_store_settings={ - 'db': f'test_xmodule_{THIS_UUID}', - 'host': MONGO_HOST, - 'port': MONGO_PORT_NUM, - 'collection': 'test_modulestore', + "db": f"test_xmodule_{THIS_UUID}", + "host": MONGO_HOST, + "port": MONGO_PORT_NUM, + "collection": "test_modulestore", }, ) CONTENTSTORE = { - 'ENGINE': 'xmodule.contentstore.mongo.MongoContentStore', - 'DOC_STORE_CONFIG': { - 'host': MONGO_HOST, - 'db': f'test_xcontent_{THIS_UUID}', - 'port': MONGO_PORT_NUM, - 'collection': 'dont_trip', + "ENGINE": "xmodule.contentstore.mongo.MongoContentStore", + "DOC_STORE_CONFIG": { + "host": MONGO_HOST, + "db": f"test_xcontent_{THIS_UUID}", + "port": MONGO_PORT_NUM, + "collection": "dont_trip", }, # allow for additional options that can be keyed on a name, e.g. 'trashcan' - 'ADDITIONAL_OPTIONS': { - 'trashcan': { - 'bucket': 'trash_fs' - } - } + "ADDITIONAL_OPTIONS": {"trashcan": {"bucket": "trash_fs"}}, } DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': TEST_ROOT / "db" / "cms.db", - 'ATOMIC_REQUESTS': True, + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": TEST_ROOT / "db" / "cms.db", + "ATOMIC_REQUESTS": True, }, } LMS_BASE = "localhost:8000" LMS_ROOT_URL = f"http://{LMS_BASE}" -FEATURES['PREVIEW_LMS_BASE'] = "preview.localhost" +FEATURES["PREVIEW_LMS_BASE"] = "preview.localhost" CMS_BASE = "localhost:8001" CMS_ROOT_URL = f"http://{CMS_BASE}" @@ -145,49 +142,47 @@ CACHES = { # This is the cache used for most things. # In staging/prod envs, the sessions also live here. - 'default': { - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', - 'LOCATION': 'edx_loc_mem_cache', - 'KEY_FUNCTION': 'common.djangoapps.util.memcache.safe_key', + "default": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + "LOCATION": "edx_loc_mem_cache", + "KEY_FUNCTION": "common.djangoapps.util.memcache.safe_key", }, - # The general cache is what you get if you use our util.cache. It's used for # things like caching the course.xml file for different A/B test groups. # We set it to be a DummyCache to force reloading of course.xml in dev. # In staging environments, we would grab VERSION from data uploaded by the # push process. - 'general': { - 'BACKEND': 'django.core.cache.backends.dummy.DummyCache', - 'KEY_PREFIX': 'general', - 'VERSION': 4, - 'KEY_FUNCTION': 'common.djangoapps.util.memcache.safe_key', + "general": { + "BACKEND": "django.core.cache.backends.dummy.DummyCache", + "KEY_PREFIX": "general", + "VERSION": 4, + "KEY_FUNCTION": "common.djangoapps.util.memcache.safe_key", }, - - 'mongo_metadata_inheritance': { - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', - 'LOCATION': os.path.join(tempfile.gettempdir(), 'mongo_metadata_inheritance'), - 'TIMEOUT': 300, - 'KEY_FUNCTION': 'common.djangoapps.util.memcache.safe_key', + "mongo_metadata_inheritance": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + "LOCATION": os.path.join(tempfile.gettempdir(), "mongo_metadata_inheritance"), + "TIMEOUT": 300, + "KEY_FUNCTION": "common.djangoapps.util.memcache.safe_key", }, - 'loc_cache': { - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', - 'LOCATION': 'edx_location_mem_cache', + "loc_cache": { + "BACKEND": "django.core.cache.backends.locmem.LocMemCache", + "LOCATION": "edx_location_mem_cache", }, - 'course_structure_cache': { - 'BACKEND': 'django.core.cache.backends.dummy.DummyCache', + "course_structure_cache": { + "BACKEND": "django.core.cache.backends.dummy.DummyCache", }, } ################################# CELERY ###################################### CELERY_ALWAYS_EAGER = True -CELERY_RESULT_BACKEND = 'django-cache' +CELERY_RESULT_BACKEND = "django-cache" CLEAR_REQUEST_CACHE_ON_TASK_COMPLETION = False # test_status_cancel in cms/cms_user_tasks/test.py is failing without this # @override_setting for BROKER_URL is not working in testcase, so updating here -BROKER_URL = 'memory://localhost/' +BROKER_URL = "memory://localhost/" ########################### Server Ports ################################### @@ -202,99 +197,99 @@ ################### Make tests faster # http://slacy.com/blog/2012/04/make-your-tests-faster-in-django-1-4/ PASSWORD_HASHERS = [ - 'django.contrib.auth.hashers.SHA1PasswordHasher', - 'django.contrib.auth.hashers.MD5PasswordHasher', + "django.contrib.auth.hashers.SHA1PasswordHasher", + "django.contrib.auth.hashers.MD5PasswordHasher", ] # No segment key CMS_SEGMENT_KEY = None -FEATURES['DISABLE_SET_JWT_COOKIES_FOR_TESTS'] = True +FEATURES["DISABLE_SET_JWT_COOKIES_FOR_TESTS"] = True -FEATURES['ENABLE_SERVICE_STATUS'] = True +FEATURES["ENABLE_SERVICE_STATUS"] = True # Toggles embargo on for testing -FEATURES['EMBARGO'] = True +FEATURES["EMBARGO"] = True TEST_THEME = COMMON_ROOT / "test" / "test-theme" # For consistency in user-experience, keep the value of this setting in sync with # the one in lms/envs/test.py -FEATURES['ENABLE_DISCUSSION_SERVICE'] = False +FEATURES["ENABLE_DISCUSSION_SERVICE"] = False # Enable a parental consent age limit for testing PARENTAL_CONSENT_AGE_LIMIT = 13 # Enable certificates for the tests -FEATURES['CERTIFICATES_HTML_VIEW'] = True +FEATURES["CERTIFICATES_HTML_VIEW"] = True # Enable content libraries code for the tests -FEATURES['ENABLE_CONTENT_LIBRARIES'] = True +FEATURES["ENABLE_CONTENT_LIBRARIES"] = True -FEATURES['ENABLE_EDXNOTES'] = True +FEATURES["ENABLE_EDXNOTES"] = True # MILESTONES -FEATURES['MILESTONES_APP'] = True +FEATURES["MILESTONES_APP"] = True # ENTRANCE EXAMS -FEATURES['ENTRANCE_EXAMS'] = True +FEATURES["ENTRANCE_EXAMS"] = True ENTRANCE_EXAM_MIN_SCORE_PCT = 50 -VIDEO_CDN_URL = { - 'CN': 'http://api.xuetangx.com/edx/video?s3_url=' -} +VIDEO_CDN_URL = {"CN": "http://api.xuetangx.com/edx/video?s3_url="} # Courseware Search Index -FEATURES['ENABLE_COURSEWARE_INDEX'] = True -FEATURES['ENABLE_LIBRARY_INDEX'] = True +FEATURES["ENABLE_COURSEWARE_INDEX"] = True +FEATURES["ENABLE_LIBRARY_INDEX"] = True SEARCH_ENGINE = "search.tests.mock_search_engine.MockSearchEngine" -FEATURES['ENABLE_ENROLLMENT_TRACK_USER_PARTITION'] = True +FEATURES["ENABLE_ENROLLMENT_TRACK_USER_PARTITION"] = True ########################## AUTHOR PERMISSION ####################### -FEATURES['ENABLE_CREATOR_GROUP'] = False +FEATURES["ENABLE_CREATOR_GROUP"] = False # teams feature -FEATURES['ENABLE_TEAMS'] = True +FEATURES["ENABLE_TEAMS"] = True # Dummy secret key for dev/test -SECRET_KEY = '85920908f28904ed733fe576320db18cabd7b6cd' +SECRET_KEY = "85920908f28904ed733fe576320db18cabd7b6cd" ######### custom courses ######### INSTALLED_APPS += [ - 'openedx.core.djangoapps.ccxcon.apps.CCXConnectorConfig', - 'common.djangoapps.third_party_auth.apps.ThirdPartyAuthConfig', + "openedx.core.djangoapps.ccxcon.apps.CCXConnectorConfig", + "common.djangoapps.third_party_auth.apps.ThirdPartyAuthConfig", ] -FEATURES['CUSTOM_COURSES_EDX'] = True +FEATURES["CUSTOM_COURSES_EDX"] = True ########################## VIDEO IMAGE STORAGE ############################ VIDEO_IMAGE_SETTINGS = dict( - VIDEO_IMAGE_MAX_BYTES=2 * 1024 * 1024, # 2 MB - VIDEO_IMAGE_MIN_BYTES=2 * 1024, # 2 KB + VIDEO_IMAGE_MAX_BYTES=2 * 1024 * 1024, # 2 MB + VIDEO_IMAGE_MIN_BYTES=2 * 1024, # 2 KB STORAGE_KWARGS=dict( location=MEDIA_ROOT, ), - DIRECTORY_PREFIX='video-images/', + DIRECTORY_PREFIX="video-images/", BASE_URL=MEDIA_URL, ) -VIDEO_IMAGE_DEFAULT_FILENAME = 'default_video_image.png' +VIDEO_IMAGE_DEFAULT_FILENAME = "default_video_image.png" ########################## VIDEO TRANSCRIPTS STORAGE ############################ VIDEO_TRANSCRIPTS_SETTINGS = dict( - VIDEO_TRANSCRIPTS_MAX_BYTES=3 * 1024 * 1024, # 3 MB + VIDEO_TRANSCRIPTS_MAX_BYTES=3 * 1024 * 1024, # 3 MB STORAGE_KWARGS=dict( location=MEDIA_ROOT, base_url=MEDIA_URL, ), - DIRECTORY_PREFIX='video-transcripts/', + DIRECTORY_PREFIX="video-transcripts/", ) ####################### Plugin Settings ########################## # pylint: disable=wrong-import-position, wrong-import-order from edx_django_utils.plugins import add_plugins + # pylint: disable=wrong-import-position, wrong-import-order from openedx.core.djangoapps.plugins.constants import ProjectType, SettingsType + add_plugins(__name__, ProjectType.CMS, SettingsType.TEST) ########################## Derive Any Derived Settings ####################### @@ -310,22 +305,22 @@ # Used in edx-proctoring for ID generation in lieu of SECRET_KEY - dummy value # (ref MST-637) -PROCTORING_USER_OBFUSCATION_KEY = '85920908f28904ed733fe576320db18cabd7b6cd' +PROCTORING_USER_OBFUSCATION_KEY = "85920908f28904ed733fe576320db18cabd7b6cd" ##### LOGISTRATION RATE LIMIT SETTINGS ##### -LOGISTRATION_RATELIMIT_RATE = '5/5m' -LOGISTRATION_PER_EMAIL_RATELIMIT_RATE = '6/5m' -LOGISTRATION_API_RATELIMIT = '5/m' +LOGISTRATION_RATELIMIT_RATE = "5/5m" +LOGISTRATION_PER_EMAIL_RATELIMIT_RATE = "6/5m" +LOGISTRATION_API_RATELIMIT = "5/m" -REGISTRATION_VALIDATION_RATELIMIT = '5/minute' -REGISTRATION_RATELIMIT = '5/minute' -OPTIONAL_FIELD_API_RATELIMIT = '5/m' +REGISTRATION_VALIDATION_RATELIMIT = "5/minute" +REGISTRATION_RATELIMIT = "5/minute" +OPTIONAL_FIELD_API_RATELIMIT = "5/m" -RESET_PASSWORD_TOKEN_VALIDATE_API_RATELIMIT = '2/m' -RESET_PASSWORD_API_RATELIMIT = '2/m' +RESET_PASSWORD_TOKEN_VALIDATE_API_RATELIMIT = "2/m" +RESET_PASSWORD_API_RATELIMIT = "2/m" ############### Settings for proctoring ############### -PROCTORING_USER_OBFUSCATION_KEY = 'test_key' +PROCTORING_USER_OBFUSCATION_KEY = "test_key" #################### Network configuration #################### # Tests are not behind any proxies @@ -339,10 +334,5 @@ ############## openedx-learning (Learning Core) config ############## OPENEDX_LEARNING = { - 'MEDIA': { - 'BACKEND': 'django.core.files.storage.InMemoryStorage', - 'OPTIONS': { - 'location': MEDIA_ROOT + "_private" - } - } + "MEDIA": {"BACKEND": "django.core.files.storage.InMemoryStorage", "OPTIONS": {"location": MEDIA_ROOT + "_private"}} } diff --git a/common/djangoapps/student/tests/test_filters.py b/common/djangoapps/student/tests/test_filters.py index 376595a8507b..bf79ed7ae402 100644 --- a/common/djangoapps/student/tests/test_filters.py +++ b/common/djangoapps/student/tests/test_filters.py @@ -1,6 +1,7 @@ """ Test that various filters are fired for models/views in the student app. """ +from django.conf import settings from django.http import HttpResponse from django.test import override_settings from django.urls import reverse @@ -421,7 +422,7 @@ def test_dashboard_redirect_account_settings(self): response = self.client.get(self.dashboard_url) self.assertEqual(status.HTTP_302_FOUND, response.status_code) - self.assertEqual(reverse("account_settings"), response.url) + self.assertEqual(settings.ACCOUNT_MICROFRONTEND_URL, response.url) @override_settings( OPEN_EDX_FILTERS_CONFIG={ diff --git a/common/djangoapps/student/tests/test_views.py b/common/djangoapps/student/tests/test_views.py index 15ecdab23642..b63c522bbd0f 100644 --- a/common/djangoapps/student/tests/test_views.py +++ b/common/djangoapps/student/tests/test_views.py @@ -233,7 +233,7 @@ def test_redirect_account_settings(self): """ UserProfile.objects.get(user=self.user).delete() response = self.client.get(self.path) - self.assertRedirects(response, reverse('account_settings')) + self.assertRedirects(response, settings.ACCOUNT_MICROFRONTEND_URL, target_status_code=302) @patch('common.djangoapps.student.views.dashboard.learner_home_mfe_enabled') def test_redirect_to_learner_home(self, mock_learner_home_mfe_enabled): diff --git a/common/djangoapps/student/views/dashboard.py b/common/djangoapps/student/views/dashboard.py index f729a2aee130..05279fe8cdd3 100644 --- a/common/djangoapps/student/views/dashboard.py +++ b/common/djangoapps/student/views/dashboard.py @@ -518,7 +518,7 @@ def student_dashboard(request): # lint-amnesty, pylint: disable=too-many-statem """ user = request.user if not UserProfile.objects.filter(user=user).exists(): - return redirect(reverse('account_settings')) + return redirect(settings.ACCOUNT_MICROFRONTEND_URL) if learner_home_mfe_enabled(): return redirect(settings.LEARNER_HOME_MICROFRONTEND_URL) @@ -623,7 +623,7 @@ def student_dashboard(request): # lint-amnesty, pylint: disable=too-many-statem "Go to {link_start}your Account Settings{link_end}.") ).format( link_start=HTML("").format( - account_setting_page=reverse('account_settings'), + account_setting_page=settings.ACCOUNT_MICROFRONTEND_URL, ), link_end=HTML("") ) @@ -892,7 +892,7 @@ def student_dashboard(request): # lint-amnesty, pylint: disable=too-many-statem except DashboardRenderStarted.RenderInvalidDashboard as exc: response = render_to_response(exc.dashboard_template, exc.template_context) except DashboardRenderStarted.RedirectToPage as exc: - response = HttpResponseRedirect(exc.redirect_to or reverse('account_settings')) + response = HttpResponseRedirect(exc.redirect_to or settings.ACCOUNT_MICROFRONTEND_URL) except DashboardRenderStarted.RenderCustomResponse as exc: response = exc.response else: diff --git a/common/djangoapps/third_party_auth/api/tests/test_views.py b/common/djangoapps/third_party_auth/api/tests/test_views.py index 948314a0672f..670caf04c7f1 100644 --- a/common/djangoapps/third_party_auth/api/tests/test_views.py +++ b/common/djangoapps/third_party_auth/api/tests/test_views.py @@ -2,10 +2,11 @@ Tests for the Third Party Auth REST API """ +import urllib from unittest.mock import patch import ddt -import six +from django.conf import settings from django.http import QueryDict from django.test.utils import override_settings from django.urls import reverse @@ -219,7 +220,7 @@ def make_url(self, identifier): """ return '?'.join([ reverse('third_party_auth_users_api_v2'), - six.moves.urllib.parse.urlencode(identifier) + urllib.parse.urlencode(identifier) ]) @@ -377,11 +378,12 @@ def test_get(self): """ self.client.login(username=self.user.username, password=PASSWORD) response = self.client.get(self.url, content_type="application/json") + next_url = urllib.parse.quote(settings.ACCOUNT_MICROFRONTEND_URL, safe="") assert response.status_code == 200 assert (response.data == [{ 'accepts_logins': True, 'name': 'Google', 'disconnect_url': '/auth/disconnect/google-oauth2/?', - 'connect_url': '/auth/login/google-oauth2/?auth_entry=account_settings&next=%2Faccount%2Fsettings', + 'connect_url': f'/auth/login/google-oauth2/?auth_entry=account_settings&next={next_url}', 'connected': False, 'id': 'oa2-google-oauth2' }]) diff --git a/common/djangoapps/third_party_auth/api/views.py b/common/djangoapps/third_party_auth/api/views.py index 97d1a7d6dba4..c1127f8e335d 100644 --- a/common/djangoapps/third_party_auth/api/views.py +++ b/common/djangoapps/third_party_auth/api/views.py @@ -9,7 +9,6 @@ from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user from django.db.models import Q from django.http import Http404 -from django.urls import reverse from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication from edx_rest_framework_extensions.auth.session.authentication import SessionAuthenticationAllowInactiveUser from rest_framework import exceptions, permissions, status, throttling @@ -425,7 +424,7 @@ def get(self, request): state.provider.provider_id, pipeline.AUTH_ENTRY_ACCOUNT_SETTINGS, # The url the user should be directed to after the auth process has completed. - redirect_url=reverse('account_settings'), + redirect_url=settings.ACCOUNT_MICROFRONTEND_URL, ), 'accepts_logins': state.provider.accepts_logins, # If the user is connected, sending a POST request to this url removes the connection diff --git a/common/djangoapps/third_party_auth/tests/specs/base.py b/common/djangoapps/third_party_auth/tests/specs/base.py index 8f96235017da..524cd64ff366 100644 --- a/common/djangoapps/third_party_auth/tests/specs/base.py +++ b/common/djangoapps/third_party_auth/tests/specs/base.py @@ -2,7 +2,6 @@ Base integration test for provider implementations. """ - import json import unittest from contextlib import contextmanager @@ -11,7 +10,7 @@ import pytest from django import test from django.conf import settings -from django.contrib import auth +from django.contrib import auth, messages from django.contrib.auth import models as auth_models from django.contrib.messages.storage import fallback from django.contrib.sessions.backends import cache @@ -28,7 +27,6 @@ from openedx.core.djangoapps.user_authn.views.register import RegistrationView from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers from openedx.core.djangoapps.site_configuration.tests.factories import SiteFactory -from openedx.core.djangoapps.user_api.accounts.settings_views import account_settings_context from common.djangoapps.student import models as student_models from common.djangoapps.student.tests.factories import UserFactory @@ -56,9 +54,9 @@ def _check_registration_form_username(self, form_data, test_username, expected): test_username (str): username to check the form initialization with. expected (str): expected cleaned username after the form initialization. """ - form_data['username'] = test_username + form_data["username"] = test_username form_field_data = self.provider.get_register_form_data(form_data) - assert form_field_data['username'] == expected + assert form_field_data["username"] == expected def assert_redirect_to_provider_looks_correct(self, response): """Asserts the redirect to the provider's site looks correct. @@ -70,9 +68,11 @@ def assert_redirect_to_provider_looks_correct(self, response): example, more details about the format of the Location header. """ assert 302 == response.status_code - assert response.has_header('Location') + assert response.has_header("Location") - def assert_register_response_in_pipeline_looks_correct(self, response, pipeline_kwargs, required_fields): # lint-amnesty, pylint: disable=invalid-name + def assert_register_response_in_pipeline_looks_correct( + self, response, pipeline_kwargs, required_fields + ): # lint-amnesty, pylint: disable=invalid-name """Performs spot checks of the rendered register.html page. When we display the new account registration form after the user signs @@ -84,10 +84,7 @@ def assert_register_response_in_pipeline_looks_correct(self, response, pipeline_ assertions in your test, override this method. """ # Check that the correct provider was selected. - self.assertContains( - response, - '"errorMessage": null' - ) + self.assertContains(response, '"errorMessage": null') self.assertContains( response, f'"currentProvider": "{self.provider.name}"', @@ -99,45 +96,67 @@ def assert_register_response_in_pipeline_looks_correct(self, response, pipeline_ if prepopulated_form_data in required_fields: self.assertContains(response, form_field_data[prepopulated_form_data]) - def assert_register_form_populates_unicode_username_correctly(self, request): # lint-amnesty, pylint: disable=invalid-name + def _get_user_providers_state(self, request): """ - Check the registration form username field behaviour with unicode values. - - The field could be empty or prefilled depending on whether ENABLE_UNICODE_USERNAME feature is disabled/enabled. + Return provider user states and duplicated providers. """ - unicode_username = 'Червона_Калина' - ascii_substring = 'untouchable' - partial_unicode_username = unicode_username + ascii_substring - pipeline_kwargs = pipeline.get(request)['kwargs'] - - assert settings.FEATURES['ENABLE_UNICODE_USERNAME'] is False - - self._check_registration_form_username(pipeline_kwargs, unicode_username, '') - self._check_registration_form_username(pipeline_kwargs, partial_unicode_username, ascii_substring) - - with mock.patch.dict('django.conf.settings.FEATURES', {'ENABLE_UNICODE_USERNAME': True}): - self._check_registration_form_username(pipeline_kwargs, unicode_username, unicode_username) - - # pylint: disable=invalid-name - def assert_account_settings_context_looks_correct(self, context, duplicate=False, linked=None): - """Asserts the user's account settings page context is in the expected state. + data = { + "auth": {}, + } + data["duplicate_provider"] = pipeline.get_duplicate_provider(messages.get_messages(request)) + auth_states = pipeline.get_provider_user_states(request.user) + data["auth"]["providers"] = [ + { + "name": state.provider.name, + "connected": state.has_account, + } + for state in auth_states + if state.provider.display_for_login or state.has_account + ] + return data + + def assert_third_party_accounts_state(self, request, duplicate=False, linked=None): + """ + Asserts the user's third party account in the expected state. - If duplicate is True, we expect context['duplicate_provider'] to contain + If duplicate is True, we expect data['duplicate_provider'] to contain the duplicate provider backend name. If linked is passed, we conditionally - check that the provider is included in context['auth']['providers'] and + check that the provider is included in data['auth']['providers'] and its connected state is correct. """ + data = self._get_user_providers_state(request) if duplicate: - assert context['duplicate_provider'] == self.provider.backend_name + assert data["duplicate_provider"] == self.provider.backend_name else: - assert context['duplicate_provider'] is None + assert data["duplicate_provider"] is None if linked is not None: expected_provider = [ - provider for provider in context['auth']['providers'] if provider['name'] == self.provider.name + provider for provider in data["auth"]["providers"] if provider["name"] == self.provider.name ][0] assert expected_provider is not None - assert expected_provider['connected'] == linked + assert expected_provider["connected"] == linked + + def assert_register_form_populates_unicode_username_correctly( + self, request + ): # lint-amnesty, pylint: disable=invalid-name + """ + Check the registration form username field behaviour with unicode values. + + The field could be empty or prefilled depending on whether ENABLE_UNICODE_USERNAME feature is disabled/enabled. + """ + unicode_username = "Червона_Калина" + ascii_substring = "untouchable" + partial_unicode_username = unicode_username + ascii_substring + pipeline_kwargs = pipeline.get(request)["kwargs"] + + assert settings.FEATURES["ENABLE_UNICODE_USERNAME"] is False + + self._check_registration_form_username(pipeline_kwargs, unicode_username, "") + self._check_registration_form_username(pipeline_kwargs, partial_unicode_username, ascii_substring) + + with mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_UNICODE_USERNAME": True}): + self._check_registration_form_username(pipeline_kwargs, unicode_username, unicode_username) def assert_exception_redirect_looks_correct(self, expected_uri, auth_entry=None): """Tests middleware conditional redirection. @@ -147,49 +166,48 @@ def assert_exception_redirect_looks_correct(self, expected_uri, auth_entry=None) """ exception_middleware = middleware.ExceptionMiddleware(get_response=lambda request: None) request, _ = self.get_request_and_strategy(auth_entry=auth_entry) - response = exception_middleware.process_exception( - request, exceptions.AuthCanceled(request.backend)) - location = response.get('Location') + response = exception_middleware.process_exception(request, exceptions.AuthCanceled(request.backend)) + location = response.get("Location") assert 302 == response.status_code - assert 'canceled' in location + assert "canceled" in location assert self.backend_name in location - assert location.startswith(expected_uri + '?') + assert location.startswith(expected_uri + "?") def assert_json_failure_response_is_inactive_account(self, response): """Asserts failure on /login for inactive account looks right.""" assert 400 == response.status_code - payload = json.loads(response.content.decode('utf-8')) + payload = json.loads(response.content.decode("utf-8")) context = { - 'platformName': configuration_helpers.get_value('PLATFORM_NAME', settings.PLATFORM_NAME), - 'supportLink': configuration_helpers.get_value('SUPPORT_SITE_LINK', settings.SUPPORT_SITE_LINK) + "platformName": configuration_helpers.get_value("PLATFORM_NAME", settings.PLATFORM_NAME), + "supportLink": configuration_helpers.get_value("SUPPORT_SITE_LINK", settings.SUPPORT_SITE_LINK), } - assert not payload.get('success') - assert 'inactive-user' in payload.get('error_code') - assert context == payload.get('context') + assert not payload.get("success") + assert "inactive-user" in payload.get("error_code") + assert context == payload.get("context") def assert_json_failure_response_is_missing_social_auth(self, response): """Asserts failure on /login for missing social auth looks right.""" assert 403 == response.status_code - payload = json.loads(response.content.decode('utf-8')) - assert not payload.get('success') - assert payload.get('error_code') == 'third-party-auth-with-no-linked-account' + payload = json.loads(response.content.decode("utf-8")) + assert not payload.get("success") + assert payload.get("error_code") == "third-party-auth-with-no-linked-account" def assert_json_failure_response_is_username_collision(self, response): """Asserts the json response indicates a username collision.""" assert 409 == response.status_code - payload = json.loads(response.content.decode('utf-8')) - assert not payload.get('success') - assert 'It looks like this username is already taken' == payload['username'][0]['user_message'] + payload = json.loads(response.content.decode("utf-8")) + assert not payload.get("success") + assert "It looks like this username is already taken" == payload["username"][0]["user_message"] def assert_json_success_response_looks_correct(self, response, verify_redirect_url): """Asserts the json response indicates success and redirection.""" assert 200 == response.status_code - payload = json.loads(response.content.decode('utf-8')) - assert payload.get('success') + payload = json.loads(response.content.decode("utf-8")) + assert payload.get("success") if verify_redirect_url: - assert pipeline.get_complete_url(self.provider.backend_name) == payload.get('redirect_url') + assert pipeline.get_complete_url(self.provider.backend_name) == payload.get("redirect_url") def assert_login_response_before_pipeline_looks_correct(self, response): """Asserts a GET of /login not in the pipeline looks correct.""" @@ -218,19 +236,19 @@ def assert_redirect_after_pipeline_completes(self, response, expected_redirect_u assert 302 == response.status_code # NOTE: Ideally we should use assertRedirects(), however it errors out due to the hostname, testserver, # not being properly set. This may be an issue with the call made by PSA, but we are not certain. - assert response.get('Location').endswith( + assert response.get("Location").endswith( expected_redirect_url or django_settings.SOCIAL_AUTH_LOGIN_REDIRECT_URL ) def assert_redirect_to_login_looks_correct(self, response): """Asserts a response would redirect to /login.""" assert 302 == response.status_code - assert '/login' == response.get('Location') + assert "/login" == response.get("Location") def assert_redirect_to_register_looks_correct(self, response): """Asserts a response would redirect to /register.""" assert 302 == response.status_code - assert '/register' == response.get('Location') + assert "/register" == response.get("Location") def assert_register_response_before_pipeline_looks_correct(self, response): """Asserts a GET of /register not in the pipeline looks correct.""" @@ -241,43 +259,41 @@ def assert_register_response_before_pipeline_looks_correct(self, response): def assert_social_auth_does_not_exist_for_user(self, user, strategy): """Asserts a user does not have an auth with the expected provider.""" - social_auths = strategy.storage.user.get_social_auth_for_user( - user, provider=self.provider.backend_name) + social_auths = strategy.storage.user.get_social_auth_for_user(user, provider=self.provider.backend_name) assert 0 == len(social_auths) def assert_social_auth_exists_for_user(self, user, strategy): """Asserts a user has a social auth with the expected provider.""" - social_auths = strategy.storage.user.get_social_auth_for_user( - user, provider=self.provider.backend_name) + social_auths = strategy.storage.user.get_social_auth_for_user(user, provider=self.provider.backend_name) assert 1 == len(social_auths) assert self.backend_name == social_auths[0].provider def assert_logged_in_cookie_redirect(self, response): - """Verify that the user was redirected in order to set the logged in cookie. """ + """Verify that the user was redirected in order to set the logged in cookie.""" assert response.status_code == 302 - assert response['Location'] == pipeline.get_complete_url(self.provider.backend_name) - assert response.cookies[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME].value == 'true' + assert response["Location"] == pipeline.get_complete_url(self.provider.backend_name) + assert response.cookies[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME].value == "true" assert django_settings.EDXMKTG_USER_INFO_COOKIE_NAME in response.cookies @property def backend_name(self): - """ Shortcut for the backend name """ + """Shortcut for the backend name""" return self.provider.backend_name def get_registration_post_vars(self, overrides=None): """POST vars generated by the registration form.""" defaults = { - 'username': 'username', - 'name': 'First Last', - 'gender': '', - 'year_of_birth': '', - 'level_of_education': '', - 'goals': '', - 'honor_code': 'true', - 'terms_of_service': 'true', - 'password': 'password', - 'mailing_address': '', - 'email': 'user@email.com', + "username": "username", + "name": "First Last", + "gender": "", + "year_of_birth": "", + "level_of_education": "", + "goals": "", + "honor_code": "true", + "terms_of_service": "true", + "password": "password", + "mailing_address": "", + "email": "user@email.com", } if overrides: @@ -294,12 +310,13 @@ def get_request_and_strategy(self, auth_entry=None, redirect_uri=None): social_django.utils.strategy(). """ request = self.request_factory.get( - pipeline.get_complete_url(self.backend_name) + - '?redirect_state=redirect_state_value&code=code_value&state=state_value') + pipeline.get_complete_url(self.backend_name) + + "?redirect_state=redirect_state_value&code=code_value&state=state_value" + ) request.site = SiteFactory.create() request.user = auth_models.AnonymousUser() request.session = cache.SessionStore() - request.session[self.backend_name + '_state'] = 'state_value' + request.session[self.backend_name + "_state"] = "state_value" if auth_entry: request.session[pipeline.AUTH_ENTRY_KEY] = auth_entry @@ -312,7 +329,7 @@ def get_request_and_strategy(self, auth_entry=None, redirect_uri=None): def _get_login_post_request(self, strategy): """Gets a fully-configured login POST request given a strategy and pipeline.""" - request = self.request_factory.post(reverse('login_api')) + request = self.request_factory.post(reverse("login_api")) # Note: The shared GET request can't be used for login, which is now POST-only, # so this POST request is given a copy of all configuration from the GET request @@ -329,7 +346,7 @@ def _get_login_post_request(self, strategy): def _patch_edxmako_current_request(self, request): """Make ``request`` be the current request for edxmako template rendering.""" - with mock.patch('common.djangoapps.edxmako.request_context.get_current_request', return_value=request): + with mock.patch("common.djangoapps.edxmako.request_context.get_current_request", return_value=request): yield def get_user_by_email(self, strategy, email): @@ -337,11 +354,13 @@ def get_user_by_email(self, strategy, email): return strategy.storage.user.user_model().objects.get(email=email) def set_logged_in_cookies(self, request): - """Simulate setting the marketing site cookie on the request. """ - request.COOKIES[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME] = 'true' - request.COOKIES[django_settings.EDXMKTG_USER_INFO_COOKIE_NAME] = json.dumps({ - 'version': django_settings.EDXMKTG_USER_INFO_COOKIE_VERSION, - }) + """Simulate setting the marketing site cookie on the request.""" + request.COOKIES[django_settings.EDXMKTG_LOGGED_IN_COOKIE_NAME] = "true" + request.COOKIES[django_settings.EDXMKTG_USER_INFO_COOKIE_NAME] = json.dumps( + { + "version": django_settings.EDXMKTG_USER_INFO_COOKIE_VERSION, + } + ) def create_user_models_for_existing_account(self, strategy, email, password, username, skip_social_auth=False): """Creates user, profile, registration, and (usually) social auth. @@ -371,10 +390,10 @@ def fake_auth_complete(self, strategy): """ args = () kwargs = { - 'request': strategy.request, - 'backend': strategy.request.backend, - 'user': None, - 'response': self.get_response_data(), + "request": strategy.request, + "backend": strategy.request.backend, + "user": None, + "response": self.get_response_data(), } return strategy.authenticate(*args, **kwargs) @@ -386,6 +405,7 @@ class IntegrationTestMixin(testutil.TestCase, test.TestCase, HelperMixin): currently less comprehensive. Some providers are tested with this, others with IntegrationTest. """ + # Provider information: PROVIDER_NAME = "override" PROVIDER_BACKEND = "override" @@ -399,8 +419,8 @@ def setUp(self): super().setUp() self.request_factory = test.RequestFactory() - self.login_page_url = reverse('signin_user') - self.register_page_url = reverse('register_user') + self.login_page_url = reverse("signin_user") + self.register_page_url = reverse("register_user") patcher = testutil.patch_mako_templates() patcher.start() self.addCleanup(patcher.stop) @@ -415,47 +435,44 @@ def _test_register(self, **extra_defaults): try_login_response = self.client.get(provider_register_url) # The user should be redirected to the provider's login page: assert try_login_response.status_code == 302 - provider_response = self.do_provider_login(try_login_response['Location']) + provider_response = self.do_provider_login(try_login_response["Location"]) # We should be redirected to the register screen since this account is not linked to an edX account: assert provider_response.status_code == 302 - assert provider_response['Location'] == self.register_page_url + assert provider_response["Location"] == self.register_page_url register_response = self.client.get(self.register_page_url) tpa_context = register_response.context["data"]["third_party_auth"] - assert tpa_context['errorMessage'] is None + assert tpa_context["errorMessage"] is None # Check that the "You've successfully signed into [PROVIDER_NAME]" message is shown. - assert tpa_context['currentProvider'] == self.PROVIDER_NAME + assert tpa_context["currentProvider"] == self.PROVIDER_NAME # Check that the data (e.g. email) from the provider is displayed in the form: - form_data = register_response.context['data']['registration_form_desc'] - form_fields = {field['name']: field for field in form_data['fields']} - assert form_fields['email']['defaultValue'] == self.USER_EMAIL - assert form_fields['name']['defaultValue'] == self.USER_NAME - assert form_fields['username']['defaultValue'] == self.USER_USERNAME + form_data = register_response.context["data"]["registration_form_desc"] + form_fields = {field["name"]: field for field in form_data["fields"]} + assert form_fields["email"]["defaultValue"] == self.USER_EMAIL + assert form_fields["name"]["defaultValue"] == self.USER_NAME + assert form_fields["username"]["defaultValue"] == self.USER_USERNAME for field_name, value in extra_defaults.items(): - assert form_fields[field_name]['defaultValue'] == value + assert form_fields[field_name]["defaultValue"] == value registration_values = { - 'email': 'email-edited@tpa-test.none', - 'name': 'My Customized Name', - 'username': 'new_username', - 'honor_code': True, + "email": "email-edited@tpa-test.none", + "name": "My Customized Name", + "username": "new_username", + "honor_code": True, } # Now complete the form: - ajax_register_response = self.client.post( - reverse('user_api_registration'), - registration_values - ) + ajax_register_response = self.client.post(reverse("user_api_registration"), registration_values) assert ajax_register_response.status_code == 200 # Then the AJAX will finish the third party auth: continue_response = self.client.get(tpa_context["finishAuthUrl"]) # And we should be redirected to the dashboard: assert continue_response.status_code == 302 - assert continue_response['Location'] == reverse('dashboard') + assert continue_response["Location"] == reverse("dashboard") # Now check that we can login again, whether or not we have yet verified the account: self.client.logout() self._test_return_login(user_is_activated=False) self.client.logout() - self.verify_user_email('email-edited@tpa-test.none') + self.verify_user_email("email-edited@tpa-test.none") self._test_return_login(user_is_activated=True) def _test_login(self): @@ -468,27 +485,27 @@ def _test_login(self): try_login_response = self.client.get(provider_login_url) # The user should be redirected to the provider's login page: assert try_login_response.status_code == 302 - complete_response = self.do_provider_login(try_login_response['Location']) + complete_response = self.do_provider_login(try_login_response["Location"]) # We should be redirected to the login screen since this account is not linked to an edX account: assert complete_response.status_code == 302 - assert complete_response['Location'] == self.login_page_url + assert complete_response["Location"] == self.login_page_url login_response = self.client.get(self.login_page_url) tpa_context = login_response.context["data"]["third_party_auth"] - assert tpa_context['errorMessage'] is None + assert tpa_context["errorMessage"] is None # Check that the "You've successfully signed into [PROVIDER_NAME]" message is shown. - assert tpa_context['currentProvider'] == self.PROVIDER_NAME + assert tpa_context["currentProvider"] == self.PROVIDER_NAME # Now the user enters their username and password. # The AJAX on the page will log them in: ajax_login_response = self.client.post( - reverse('user_api_login_session', kwargs={'api_version': 'v1'}), - {'email': self.user.email, 'password': 'Password1234'} + reverse("user_api_login_session", kwargs={"api_version": "v1"}), + {"email": self.user.email, "password": "Password1234"}, ) assert ajax_login_response.status_code == 200 # Then the AJAX will finish the third party auth: continue_response = self.client.get(tpa_context["finishAuthUrl"]) # And we should be redirected to the dashboard: assert continue_response.status_code == 302 - assert continue_response['Location'] == reverse('dashboard') + assert continue_response["Location"] == reverse("dashboard") # Now check that we can login again: self.client.logout() @@ -502,9 +519,9 @@ def do_provider_login(self, provider_redirect_url): raise NotImplementedError def _test_return_login(self, user_is_activated=True, previous_session_timed_out=False): - """ Test logging in to an account that is already linked. """ + """Test logging in to an account that is already linked.""" # Make sure we're not logged in: - dashboard_response = self.client.get(reverse('dashboard')) + dashboard_response = self.client.get(reverse("dashboard")) assert dashboard_response.status_code == 302 # The user goes to the login page, and sees a button to login with this provider: provider_login_url = self._check_login_page() @@ -512,22 +529,22 @@ def _test_return_login(self, user_is_activated=True, previous_session_timed_out= try_login_response = self.client.get(provider_login_url) # The user should be redirected to the provider: assert try_login_response.status_code == 302 - login_response = self.do_provider_login(try_login_response['Location']) + login_response = self.do_provider_login(try_login_response["Location"]) # If the previous session was manually logged out, there will be one weird redirect # required to set the login cookie (it sticks around if the main session times out): if not previous_session_timed_out: assert login_response.status_code == 302 - assert login_response['Location'] == (self.complete_url + '?') + assert login_response["Location"] == (self.complete_url + "?") # And then we should be redirected to the dashboard: - login_response = self.client.get(login_response['Location']) + login_response = self.client.get(login_response["Location"]) assert login_response.status_code == 302 if user_is_activated: - url_expected = reverse('dashboard') + url_expected = reverse("dashboard") else: - url_expected = reverse('third_party_inactive_redirect') + '?next=' + reverse('dashboard') - assert login_response['Location'] == url_expected + url_expected = reverse("third_party_inactive_redirect") + "?next=" + reverse("dashboard") + assert login_response["Location"] == url_expected # Now we are logged in: - dashboard_response = self.client.get(reverse('dashboard')) + dashboard_response = self.client.get(reverse("dashboard")) assert dashboard_response.status_code == 200 def _check_login_page(self): @@ -545,22 +562,23 @@ def _check_register_page(self): return self._check_login_or_register_page(self.register_page_url, "registerUrl") def _check_login_or_register_page(self, url, url_to_return): - """ Shared logic for _check_login_page() and _check_register_page() """ + """Shared logic for _check_login_page() and _check_register_page()""" response = self.client.get(url) self.assertContains(response, self.PROVIDER_NAME) - context_data = response.context['data']['third_party_auth'] - provider_urls = {provider['id']: provider[url_to_return] for provider in context_data['providers']} + context_data = response.context["data"]["third_party_auth"] + provider_urls = {provider["id"]: provider[url_to_return] for provider in context_data["providers"]} assert self.PROVIDER_ID in provider_urls return provider_urls[self.PROVIDER_ID] @property def complete_url(self): - """ Get the auth completion URL for this provider """ - return reverse('social:complete', kwargs={'backend': self.PROVIDER_BACKEND}) + """Get the auth completion URL for this provider""" + return reverse("social:complete", kwargs={"backend": self.PROVIDER_BACKEND}) @unittest.skipUnless( - testutil.AUTH_FEATURES_KEY in django_settings.FEATURES, testutil.AUTH_FEATURES_KEY + ' not in settings.FEATURES') + testutil.AUTH_FEATURES_KEY in django_settings.FEATURES, testutil.AUTH_FEATURES_KEY + " not in settings.FEATURES" +) @django_utils.override_settings() # For settings reversion on a method-by-method basis. class IntegrationTest(testutil.TestCase, test.TestCase, HelperMixin): """Abstract base class for provider integration tests.""" @@ -572,46 +590,51 @@ def setUp(self): # Actual tests, executed once per child. def test_canceling_authentication_redirects_to_login_when_auth_entry_login(self): - self.assert_exception_redirect_looks_correct('/login', auth_entry=pipeline.AUTH_ENTRY_LOGIN) + self.assert_exception_redirect_looks_correct("/login", auth_entry=pipeline.AUTH_ENTRY_LOGIN) def test_canceling_authentication_redirects_to_register_when_auth_entry_register(self): - self.assert_exception_redirect_looks_correct('/register', auth_entry=pipeline.AUTH_ENTRY_REGISTER) + self.assert_exception_redirect_looks_correct("/register", auth_entry=pipeline.AUTH_ENTRY_REGISTER) def test_canceling_authentication_redirects_to_account_settings_when_auth_entry_account_settings(self): self.assert_exception_redirect_looks_correct( - '/account/settings', auth_entry=pipeline.AUTH_ENTRY_ACCOUNT_SETTINGS + "/account/settings", auth_entry=pipeline.AUTH_ENTRY_ACCOUNT_SETTINGS ) def test_canceling_authentication_redirects_to_root_when_auth_entry_not_set(self): - self.assert_exception_redirect_looks_correct('/') + self.assert_exception_redirect_looks_correct("/") - @mock.patch('common.djangoapps.third_party_auth.pipeline.segment.track') + @mock.patch("common.djangoapps.third_party_auth.pipeline.segment.track") def test_full_pipeline_succeeds_for_linking_account(self, _mock_segment_track): # First, create, the GET request and strategy that store pipeline state, # configure the backend, and mock out wire traffic. get_request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) get_request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) get_request.user = self.create_user_models_for_existing_account( - strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True) - partial_pipeline_token = strategy.session_get('partial_pipeline_token') + strategy, "user@example.com", "password", self.get_username(), skip_social_auth=True + ) + partial_pipeline_token = strategy.session_get("partial_pipeline_token") partial_data = strategy.storage.partial.load(partial_pipeline_token) # Instrument the pipeline to get to the dashboard with the full # expected state. - self.client.get( - pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) - actions.do_complete(get_request.backend, social_views._do_login, # pylint: disable=protected-access - request=get_request) + self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) + actions.do_complete( + get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access + ) post_request = self._get_login_post_request(strategy) login_user(post_request) - actions.do_complete(post_request.backend, social_views._do_login, # pylint: disable=protected-access, no-member - request=post_request) + actions.do_complete( + post_request.backend, + social_views._do_login, # pylint: disable=protected-access, no-member + request=post_request, + ) # First we expect that we're in the unlinked state, and that there # really is no association in the backend. - self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=False) + self.assert_third_party_accounts_state(get_request, linked=False) self.assert_social_auth_does_not_exist_for_user(get_request.user, strategy) # We should be redirected back to the complete page, setting @@ -630,16 +653,18 @@ def test_full_pipeline_succeeds_for_linking_account(self, _mock_segment_track): # Now we expect to be in the linked state, with a backend entry. self.assert_social_auth_exists_for_user(get_request.user, strategy) - self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=True) + self.assert_third_party_accounts_state(get_request, linked=True) def test_full_pipeline_succeeds_for_unlinking_account(self): # First, create, the GET request and strategy that store pipeline state, # configure the backend, and mock out wire traffic. get_request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) get_request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) user = self.create_user_models_for_existing_account( - strategy, 'user@example.com', 'password', self.get_username()) + strategy, "user@example.com", "password", self.get_username() + ) self.assert_social_auth_exists_for_user(user, strategy) # We're already logged in, so simulate that the cookie is set correctly @@ -647,36 +672,37 @@ def test_full_pipeline_succeeds_for_unlinking_account(self): # Instrument the pipeline to get to the dashboard with the full # expected state. - self.client.get( - pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) - actions.do_complete(get_request.backend, social_views._do_login, # pylint: disable=protected-access - request=get_request) + self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) + actions.do_complete( + get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access + ) post_request = self._get_login_post_request(strategy) with self._patch_edxmako_current_request(post_request): login_user(post_request) - actions.do_complete(post_request.backend, social_views._do_login, user=user, # pylint: disable=protected-access, no-member - request=post_request) + actions.do_complete( + post_request.backend, + social_views._do_login, # pylint: disable=protected-access + user=user, # pylint: disable=no-member + request=post_request, + ) # Copy the user that was set on the post_request object back to the original get_request object. get_request.user = post_request.user # First we expect that we're in the linked state, with a backend entry. - self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=True) + self.assert_third_party_accounts_state(get_request, linked=True) self.assert_social_auth_exists_for_user(get_request.user, strategy) # Fire off the disconnect pipeline to unlink. self.assert_redirect_after_pipeline_completes( actions.do_disconnect( - get_request.backend, - get_request.user, - None, - redirect_field_name=auth.REDIRECT_FIELD_NAME + get_request.backend, get_request.user, None, redirect_field_name=auth.REDIRECT_FIELD_NAME ) ) # Now we expect to be in the unlinked state, with no backend entry. - self.assert_account_settings_context_looks_correct(account_settings_context(get_request), linked=False) + self.assert_third_party_accounts_state(get_request, linked=False) self.assert_social_auth_does_not_exist_for_user(user, strategy) def test_linking_already_associated_account_raises_auth_already_associated(self): @@ -684,16 +710,18 @@ def test_linking_already_associated_account_raises_auth_already_associated(self) # test_already_associated_exception_populates_dashboard_with_error. It # verifies the exception gets raised when we expect; the latter test # covers exception handling. - email = 'user@example.com' - password = 'password' + email = "user@example.com" + password = "password" username = self.get_username() _, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) backend = strategy.request.backend backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) linked_user = self.create_user_models_for_existing_account(strategy, email, password, username) unlinked_user = social_utils.Storage.user.create_user( - email='other_' + email, password=password, username='other_' + username) + email="other_" + email, password=password, username="other_" + username + ) self.assert_social_auth_exists_for_user(linked_user, strategy) self.assert_social_auth_does_not_exist_for_user(unlinked_user, strategy) @@ -711,42 +739,50 @@ def test_already_associated_exception_populates_dashboard_with_error(self): # covered in other tests. Using linked=True does, however, let us test # that the duplicate error has no effect on the state of the controls. get_request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) user = self.create_user_models_for_existing_account( - strategy, 'user@example.com', 'password', self.get_username()) + strategy, "user@example.com", "password", self.get_username() + ) self.assert_social_auth_exists_for_user(user, strategy) - self.client.get('/login') + self.client.get("/login") self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) - actions.do_complete(get_request.backend, social_views._do_login, # pylint: disable=protected-access - request=get_request) + actions.do_complete( + get_request.backend, social_views._do_login, request=get_request # pylint: disable=protected-access + ) post_request = self._get_login_post_request(strategy) with self._patch_edxmako_current_request(post_request): login_user(post_request) - actions.do_complete(post_request.backend, social_views._do_login, # pylint: disable=protected-access, no-member - user=user, request=post_request) + actions.do_complete( + post_request.backend, + social_views._do_login, # pylint: disable=protected-access, no-member + user=user, + request=post_request, + ) # Monkey-patch storage for messaging; pylint: disable=protected-access post_request._messages = fallback.FallbackStorage(post_request) middleware.ExceptionMiddleware(get_response=lambda request: None).process_exception( - post_request, - exceptions.AuthAlreadyAssociated(self.provider.backend_name, 'account is already in use.')) + post_request, exceptions.AuthAlreadyAssociated(self.provider.backend_name, "account is already in use.") + ) - self.assert_account_settings_context_looks_correct( - account_settings_context(post_request), duplicate=True, linked=True) + self.assert_third_party_accounts_state(post_request, duplicate=True, linked=True) - @mock.patch('common.djangoapps.third_party_auth.pipeline.segment.track') + @mock.patch("common.djangoapps.third_party_auth.pipeline.segment.track") def test_full_pipeline_succeeds_for_signing_in_to_existing_active_account(self, _mock_segment_track): # First, create, the GET request and strategy that store pipeline state, # configure the backend, and mock out wire traffic. get_request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) user = self.create_user_models_for_existing_account( - strategy, 'user@example.com', 'password', self.get_username()) - partial_pipeline_token = strategy.session_get('partial_pipeline_token') + strategy, "user@example.com", "password", self.get_username() + ) + partial_pipeline_token = strategy.session_get("partial_pipeline_token") partial_data = strategy.storage.partial.load(partial_pipeline_token) self.assert_social_auth_exists_for_user(user, strategy) @@ -754,19 +790,21 @@ def test_full_pipeline_succeeds_for_signing_in_to_existing_active_account(self, # Begin! Ensure that the login form contains expected controls before # the user starts the pipeline. - self.assert_login_response_before_pipeline_looks_correct(self.client.get('/login')) + self.assert_login_response_before_pipeline_looks_correct(self.client.get("/login")) # The pipeline starts by a user GETting /auth/login/. # Synthesize that request and check that it redirects to the correct # provider page. - self.assert_redirect_to_provider_looks_correct(self.client.get( - pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))) + self.assert_redirect_to_provider_looks_correct( + self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) + ) # Next, the provider makes a request against /auth/complete/ # to resume the pipeline. # pylint: disable=protected-access - self.assert_redirect_to_login_looks_correct(actions.do_complete(get_request.backend, social_views._do_login, - request=get_request)) + self.assert_redirect_to_login_looks_correct( + actions.do_complete(get_request.backend, social_views._do_login, request=get_request) + ) # At this point we know the pipeline has resumed correctly. Next we # fire off the view that displays the login form and posts it via JS. @@ -781,10 +819,16 @@ def test_full_pipeline_succeeds_for_signing_in_to_existing_active_account(self, # We should be redirected back to the complete page, setting # the "logged in" cookie for the marketing site. - self.assert_logged_in_cookie_redirect(actions.do_complete( - post_request.backend, social_views._do_login, post_request.user, None, # pylint: disable=protected-access, no-member - redirect_field_name=auth.REDIRECT_FIELD_NAME, request=post_request - )) + self.assert_logged_in_cookie_redirect( + actions.do_complete( + post_request.backend, + social_views._do_login, + post_request.user, + None, # pylint: disable=protected-access, no-member + redirect_field_name=auth.REDIRECT_FIELD_NAME, + request=post_request, + ) + ) # Set the cookie and try again self.set_logged_in_cookies(get_request) @@ -795,14 +839,16 @@ def test_full_pipeline_succeeds_for_signing_in_to_existing_active_account(self, self.assert_redirect_after_pipeline_completes( self.do_complete(strategy, get_request, partial_pipeline_token, partial_data, user) ) - self.assert_account_settings_context_looks_correct(account_settings_context(get_request)) + self.assert_third_party_accounts_state(get_request) def test_signin_fails_if_account_not_active(self): _, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) - user = self.create_user_models_for_existing_account(strategy, 'user@example.com', 'password', - self.get_username()) + user = self.create_user_models_for_existing_account( + strategy, "user@example.com", "password", self.get_username() + ) user.is_active = False user.save() @@ -813,25 +859,28 @@ def test_signin_fails_if_account_not_active(self): def test_signin_fails_if_no_account_associated(self): _, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) self.create_user_models_for_existing_account( - strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True) + strategy, "user@example.com", "password", self.get_username(), skip_social_auth=True + ) post_request = self._get_login_post_request(strategy) self.assert_json_failure_response_is_missing_social_auth(login_user(post_request)) def test_signin_associates_user_if_oauth_provider_and_tpa_is_required(self): - username, email, password = self.get_username(), 'user@example.com', 'password' + username, email, password = self.get_username(), "user@example.com", "password" _, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) user = self.create_user_models_for_existing_account(strategy, email, password, username, skip_social_auth=True) with mock.patch( - 'common.djangoapps.third_party_auth.pipeline.get_associated_user_by_email_response', - return_value=[{'user': user}, True], + "common.djangoapps.third_party_auth.pipeline.get_associated_user_by_email_response", + return_value=[{"user": user}, True], ): strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) @@ -839,30 +888,37 @@ def test_signin_associates_user_if_oauth_provider_and_tpa_is_required(self): self.assert_json_success_response_looks_correct(login_user(post_request), verify_redirect_url=True) def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_email_in_request(self): - self.assert_first_party_auth_trumps_third_party_auth(email='user@example.com') + self.assert_first_party_auth_trumps_third_party_auth(email="user@example.com") def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_password_in_request(self): - self.assert_first_party_auth_trumps_third_party_auth(password='password') + self.assert_first_party_auth_trumps_third_party_auth(password="password") def test_first_party_auth_trumps_third_party_auth_and_fails_when_credentials_bad(self): self.assert_first_party_auth_trumps_third_party_auth( - email='user@example.com', password='password', success=False) + email="user@example.com", password="password", success=False + ) def test_first_party_auth_trumps_third_party_auth_and_succeeds_when_credentials_good(self): self.assert_first_party_auth_trumps_third_party_auth( - email='user@example.com', password='password', success=True) + email="user@example.com", password="password", success=True + ) def test_pipeline_redirects_to_requested_url(self): - requested_redirect_url = 'foo' # something different from '/dashboard' - request, strategy = self.get_request_and_strategy(redirect_uri='social:complete') + requested_redirect_url = "foo" # something different from '/dashboard' + request, strategy = self.get_request_and_strategy(redirect_uri="social:complete") strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) request.session[pipeline.AUTH_REDIRECT_KEY] = requested_redirect_url - user = self.create_user_models_for_existing_account(strategy, 'user@foo.com', 'password', self.get_username()) + user = self.create_user_models_for_existing_account(strategy, "user@foo.com", "password", self.get_username()) self.set_logged_in_cookies(request) self.assert_redirect_after_pipeline_completes( - actions.do_complete(request.backend, social_views._do_login, user=user, request=request), # pylint: disable=protected-access + actions.do_complete( + request.backend, + social_views._do_login, # pylint: disable=protected-access + user=user, + request=request, + ), requested_redirect_url, ) @@ -870,44 +926,47 @@ def test_full_pipeline_succeeds_registering_new_account(self): # First, create, the request and strategy that store pipeline state. # Mock out wire traffic. request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete" + ) strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) - partial_pipeline_token = strategy.session_get('partial_pipeline_token') + partial_pipeline_token = strategy.session_get("partial_pipeline_token") partial_data = strategy.storage.partial.load(partial_pipeline_token) # Begin! Grab the registration page and check the login control on it. - self.assert_register_response_before_pipeline_looks_correct(self.client.get('/register')) + self.assert_register_response_before_pipeline_looks_correct(self.client.get("/register")) # The pipeline starts by a user GETting /auth/login/. # Synthesize that request and check that it redirects to the correct # provider page. - self.assert_redirect_to_provider_looks_correct(self.client.get( - pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN))) + self.assert_redirect_to_provider_looks_correct( + self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) + ) # Next, the provider makes a request against /auth/complete/. # pylint: disable=protected-access - self.assert_redirect_to_register_looks_correct(actions.do_complete(request.backend, social_views._do_login, - request=request)) + self.assert_redirect_to_register_looks_correct( + actions.do_complete(request.backend, social_views._do_login, request=request) + ) # At this point we know the pipeline has resumed correctly. Next we # fire off the view that displays the registration form. with self._patch_edxmako_current_request(request): self.assert_register_form_populates_unicode_username_correctly(request) self.assert_register_response_in_pipeline_looks_correct( - login_and_registration_form(strategy.request, initial_mode='register'), - pipeline.get(request)['kwargs'], - ['name', 'username', 'email'] + login_and_registration_form(strategy.request, initial_mode="register"), + pipeline.get(request)["kwargs"], + ["name", "username", "email"], ) # Next, we invoke the view that handles the POST. Not all providers # supply email. Manually add it as the user would have to; this # also serves as a test of overriding provider values. Always provide a # password for us to check that we override it properly. - overridden_password = strategy.request.POST.get('password') - email = 'new@example.com' + overridden_password = strategy.request.POST.get("password") + email = "new@example.com" - if not strategy.request.POST.get('email'): - strategy.request.POST = self.get_registration_post_vars({'email': email}) + if not strategy.request.POST.get("email"): + strategy.request.POST = self.get_registration_post_vars({"email": email}) # The user must not exist yet... with pytest.raises(auth_models.User.DoesNotExist): @@ -935,41 +994,44 @@ def test_full_pipeline_succeeds_registering_new_account(self): self.assert_redirect_after_pipeline_completes( self.do_complete(strategy, request, partial_pipeline_token, partial_data, created_user) ) - # Now the user has been redirected to the dashboard. Their third party account should now be linked. + # Their third party account should now be linked. self.assert_social_auth_exists_for_user(created_user, strategy) - self.assert_account_settings_context_looks_correct(account_settings_context(request), linked=True) + self.assert_third_party_accounts_state(request, linked=True) def test_new_account_registration_assigns_distinct_username_on_collision(self): original_username = self.get_username() request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete" + ) # Create a colliding username in the backend, then proceed with # assignment via pipeline to make sure a distinct username is created. - strategy.storage.user.create_user(username=self.get_username(), email='user@email.com', password='password') + strategy.storage.user.create_user(username=self.get_username(), email="user@email.com", password="password") backend = strategy.request.backend backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) # pylint: disable=protected-access response = actions.do_complete(backend, social_views._do_login, request=request) assert response.status_code == 302 - response = json.loads(create_account(strategy.request).content.decode('utf-8')) - assert response['username'] != original_username + response = json.loads(create_account(strategy.request).content.decode("utf-8")) + assert response["username"] != original_username def test_new_account_registration_fails_if_email_exists(self): request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri="social:complete" + ) backend = strategy.request.backend backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) # pylint: disable=protected-access - self.assert_redirect_to_register_looks_correct(actions.do_complete(backend, social_views._do_login, - request=request)) + self.assert_redirect_to_register_looks_correct( + actions.do_complete(backend, social_views._do_login, request=request) + ) with self._patch_edxmako_current_request(request): self.assert_register_response_in_pipeline_looks_correct( - login_and_registration_form(strategy.request, initial_mode='register'), - pipeline.get(request)['kwargs'], - ['name', 'username', 'email'] + login_and_registration_form(strategy.request, initial_mode="register"), + pipeline.get(request)["kwargs"], + ["name", "username", "email"], ) with self._patch_edxmako_current_request(strategy.request): @@ -979,18 +1041,18 @@ def test_new_account_registration_fails_if_email_exists(self): self.assert_json_failure_response_is_username_collision(create_account(strategy.request)) def test_pipeline_raises_auth_entry_error_if_auth_entry_invalid(self): - auth_entry = 'invalid' + auth_entry = "invalid" assert auth_entry not in pipeline._AUTH_ENTRY_CHOICES # pylint: disable=protected-access - _, strategy = self.get_request_and_strategy(auth_entry=auth_entry, redirect_uri='social:complete') + _, strategy = self.get_request_and_strategy(auth_entry=auth_entry, redirect_uri="social:complete") with pytest.raises(pipeline.AuthEntryError): strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) def test_pipeline_assumes_login_if_auth_entry_missing(self): - _, strategy = self.get_request_and_strategy(auth_entry=None, redirect_uri='social:complete') + _, strategy = self.get_request_and_strategy(auth_entry=None, redirect_uri="social:complete") response = self.fake_auth_complete(strategy) - assert response.url == reverse('signin_user') + assert response.url == reverse("signin_user") def assert_first_party_auth_trumps_third_party_auth(self, email=None, password=None, success=None): """Asserts first party auth was used in place of third party auth. @@ -1004,33 +1066,35 @@ def assert_first_party_auth_trumps_third_party_auth(self, email=None, password=N one of username or password will be missing). """ _, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) strategy.request.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy)) self.create_user_models_for_existing_account( - strategy, email, password, self.get_username(), skip_social_auth=True) + strategy, email, password, self.get_username(), skip_social_auth=True + ) post_request = self._get_login_post_request(strategy) post_request.POST = dict(post_request.POST) if email: - post_request.POST['email'] = email + post_request.POST["email"] = email if password: - post_request.POST['password'] = 'bad_' + password if success is False else password + post_request.POST["password"] = "bad_" + password if success is False else password self.assert_pipeline_running(post_request) - payload = json.loads(login_user(post_request).content.decode('utf-8')) + payload = json.loads(login_user(post_request).content.decode("utf-8")) if success is None: # Request malformed -- just one of email/password given. - assert not payload.get('success') - assert 'There was an error receiving your login information' in payload.get('value') + assert not payload.get("success") + assert "There was an error receiving your login information" in payload.get("value") elif success: # Request well-formed and credentials good. - assert payload.get('success') + assert payload.get("success") else: # Request well-formed but credentials bad. - assert not payload.get('success') - assert 'incorrect' in payload.get('value') + assert not payload.get("success") + assert "incorrect" in payload.get("value") def get_response_data(self): """Gets a dict of response data of the form given by the provider. @@ -1064,8 +1128,13 @@ def do_complete(self, strategy, request, partial_pipeline_token, partial_data, u if not user: user = request.user return actions.do_complete( - request.backend, social_views._do_login, user, None, # pylint: disable=protected-access - redirect_field_name=auth.REDIRECT_FIELD_NAME, request=request, partial_token=partial_pipeline_token + request.backend, + social_views._do_login, # pylint: disable=protected-access + user, + None, + redirect_field_name=auth.REDIRECT_FIELD_NAME, + request=request, + partial_token=partial_pipeline_token, ) diff --git a/common/djangoapps/third_party_auth/tests/specs/test_testshib.py b/common/djangoapps/third_party_auth/tests/specs/test_testshib.py index ec3efd8286e7..caddd325ba76 100644 --- a/common/djangoapps/third_party_auth/tests/specs/test_testshib.py +++ b/common/djangoapps/third_party_auth/tests/specs/test_testshib.py @@ -2,7 +2,6 @@ Third_party_auth integration tests using a mock version of the TestShib provider """ - import datetime import json import logging @@ -27,16 +26,15 @@ from common.djangoapps.third_party_auth.saml import log as saml_log from common.djangoapps.third_party_auth.tasks import fetch_saml_metadata from common.djangoapps.third_party_auth.tests import testutil, utils -from openedx.core.djangoapps.user_api.accounts.settings_views import account_settings_context from openedx.core.djangoapps.user_authn.views.login import login_user from openedx.features.enterprise_support.tests.factories import EnterpriseCustomerFactory from .base import IntegrationTestMixin -TESTSHIB_ENTITY_ID = 'https://idp.testshib.org/idp/shibboleth' -TESTSHIB_METADATA_URL = 'https://mock.testshib.org/metadata/testshib-providers.xml' -TESTSHIB_METADATA_URL_WITH_CACHE_DURATION = 'https://mock.testshib.org/metadata/testshib-providers-cache.xml' -TESTSHIB_SSO_URL = 'https://idp.testshib.org/idp/profile/SAML2/Redirect/SSO' +TESTSHIB_ENTITY_ID = "https://idp.testshib.org/idp/shibboleth" +TESTSHIB_METADATA_URL = "https://mock.testshib.org/metadata/testshib-providers.xml" +TESTSHIB_METADATA_URL_WITH_CACHE_DURATION = "https://mock.testshib.org/metadata/testshib-providers-cache.xml" +TESTSHIB_SSO_URL = "https://idp.testshib.org/idp/profile/SAML2/Redirect/SSO" class SamlIntegrationTestUtilities: @@ -44,6 +42,7 @@ class SamlIntegrationTestUtilities: Class contains methods particular to SAML integration testing so that they can be separated out from the actual test methods. """ + PROVIDER_ID = "saml-testshib" PROVIDER_NAME = "TestShib" PROVIDER_BACKEND = "tpa-saml" @@ -67,51 +66,59 @@ def setUp(self): self.addCleanup(httpretty.disable) # lint-amnesty, pylint: disable=no-member def metadata_callback(_request, _uri, headers): - """ Return a cached copy of TestShib's metadata by reading it from disk """ - return (200, headers, self.read_data_file('testshib_metadata.xml')) # lint-amnesty, pylint: disable=no-member + """Return a cached copy of TestShib's metadata by reading it from disk""" + return ( + 200, + headers, + self.read_data_file("testshib_metadata.xml"), + ) # lint-amnesty, pylint: disable=no-member - httpretty.register_uri(httpretty.GET, TESTSHIB_METADATA_URL, content_type='text/xml', body=metadata_callback) + httpretty.register_uri(httpretty.GET, TESTSHIB_METADATA_URL, content_type="text/xml", body=metadata_callback) def cache_duration_metadata_callback(_request, _uri, headers): """Return a cached copy of TestShib's metadata with a cacheDuration attribute""" - return (200, headers, self.read_data_file('testshib_metadata_with_cache_duration.xml')) # lint-amnesty, pylint: disable=no-member + return ( + 200, + headers, + self.read_data_file("testshib_metadata_with_cache_duration.xml"), + ) # lint-amnesty, pylint: disable=no-member httpretty.register_uri( httpretty.GET, TESTSHIB_METADATA_URL_WITH_CACHE_DURATION, - content_type='text/xml', - body=cache_duration_metadata_callback + content_type="text/xml", + body=cache_duration_metadata_callback, ) # Configure the SAML library to use the same request ID for every request. # Doing this and freezing the time allows us to play back recorded request/response pairs - uid_patch = patch('onelogin.saml2.utils.OneLogin_Saml2_Utils.generate_unique_id', return_value='TESTID') + uid_patch = patch("onelogin.saml2.utils.OneLogin_Saml2_Utils.generate_unique_id", return_value="TESTID") uid_patch.start() self.addCleanup(uid_patch.stop) # lint-amnesty, pylint: disable=no-member self._freeze_time(timestamp=1434326820) # This is the time when the saved request/response was recorded. def _freeze_time(self, timestamp): - """ Mock the current time for SAML, so we can replay canned requests/responses """ - now_patch = patch('onelogin.saml2.utils.OneLogin_Saml2_Utils.now', return_value=timestamp) + """Mock the current time for SAML, so we can replay canned requests/responses""" + now_patch = patch("onelogin.saml2.utils.OneLogin_Saml2_Utils.now", return_value=timestamp) now_patch.start() self.addCleanup(now_patch.stop) # lint-amnesty, pylint: disable=no-member def _configure_testshib_provider(self, **kwargs): - """ Enable and configure the TestShib SAML IdP as a third_party_auth provider """ - fetch_metadata = kwargs.pop('fetch_metadata', True) - assert_metadata_updates = kwargs.pop('assert_metadata_updates', True) - kwargs.setdefault('name', self.PROVIDER_NAME) - kwargs.setdefault('enabled', True) - kwargs.setdefault('visible', True) + """Enable and configure the TestShib SAML IdP as a third_party_auth provider""" + fetch_metadata = kwargs.pop("fetch_metadata", True) + assert_metadata_updates = kwargs.pop("assert_metadata_updates", True) + kwargs.setdefault("name", self.PROVIDER_NAME) + kwargs.setdefault("enabled", True) + kwargs.setdefault("visible", True) kwargs.setdefault("backend_name", "tpa-saml") - kwargs.setdefault('slug', self.PROVIDER_IDP_SLUG) - kwargs.setdefault('entity_id', TESTSHIB_ENTITY_ID) - kwargs.setdefault('metadata_source', TESTSHIB_METADATA_URL) - kwargs.setdefault('icon_class', 'fa-university') - kwargs.setdefault('attr_email', 'urn:oid:1.3.6.1.4.1.5923.1.1.1.6') # eduPersonPrincipalName - kwargs.setdefault('max_session_length', None) - kwargs.setdefault('send_to_registration_first', False) - kwargs.setdefault('skip_email_verification', False) + kwargs.setdefault("slug", self.PROVIDER_IDP_SLUG) + kwargs.setdefault("entity_id", TESTSHIB_ENTITY_ID) + kwargs.setdefault("metadata_source", TESTSHIB_METADATA_URL) + kwargs.setdefault("icon_class", "fa-university") + kwargs.setdefault("attr_email", "urn:oid:1.3.6.1.4.1.5923.1.1.1.6") # eduPersonPrincipalName + kwargs.setdefault("max_session_length", None) + kwargs.setdefault("send_to_registration_first", False) + kwargs.setdefault("skip_email_verification", False) saml_provider = self.configure_saml_provider(**kwargs) # pylint: disable=no-member if fetch_metadata: @@ -127,17 +134,17 @@ def _configure_testshib_provider(self, **kwargs): return saml_provider def do_provider_login(self, provider_redirect_url): - """ Mocked: the user logs in to TestShib and then gets redirected back """ + """Mocked: the user logs in to TestShib and then gets redirected back""" # The SAML provider (TestShib) will authenticate the user, then get the browser to POST a response: assert provider_redirect_url.startswith(TESTSHIB_SSO_URL) # lint-amnesty, pylint: disable=no-member saml_response_xml = utils.read_and_pre_process_xml( - os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'testshib_saml_response.xml') + os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "testshib_saml_response.xml") ) return self.client.post( # lint-amnesty, pylint: disable=no-member self.complete_url, # lint-amnesty, pylint: disable=no-member - content_type='application/x-www-form-urlencoded', + content_type="application/x-www-form-urlencoded", data=utils.prepare_saml_response_from_xml(saml_response_xml), ) @@ -150,16 +157,16 @@ class TestIndexExceptionTest(SamlIntegrationTestUtilities, IntegrationTestMixin, """ TOKEN_RESPONSE_DATA = { - 'access_token': 'access_token_value', - 'expires_in': 'expires_in_value', + "access_token": "access_token_value", + "expires_in": "expires_in_value", } USER_RESPONSE_DATA = { - 'lastName': 'lastName_value', - 'id': 'id_value', - 'firstName': 'firstName_value', - 'idp_name': 'testshib', - 'attributes': {'urn:oid:0.9.2342.19200300.100.1.1': [], 'name_id': '1'}, - 'session_index': '1', + "lastName": "lastName_value", + "id": "id_value", + "firstName": "firstName_value", + "idp_name": "testshib", + "attributes": {"urn:oid:0.9.2342.19200300.100.1.1": [], "name_id": "1"}, + "session_index": "1", } def test_index_error_from_empty_list_saml_attribute(self): @@ -169,7 +176,8 @@ def test_index_error_from_empty_list_saml_attribute(self): """ self.provider = self._configure_testshib_provider() request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) with self.assertRaises(IncorrectConfigurationException): request.backend.auth_complete = MagicMock(return_value=self.fake_auth_complete(strategy)) @@ -188,16 +196,16 @@ class TestKeyExceptionTest(SamlIntegrationTestUtilities, IntegrationTestMixin, t """ TOKEN_RESPONSE_DATA = { - 'access_token': 'access_token_value', - 'expires_in': 'expires_in_value', + "access_token": "access_token_value", + "expires_in": "expires_in_value", } USER_RESPONSE_DATA = { - 'lastName': 'lastName_value', - 'id': 'id_value', - 'firstName': 'firstName_value', - 'idp_name': 'testshib', - 'attributes': {'name_id': '1'}, - 'session_index': '1', + "lastName": "lastName_value", + "id": "id_value", + "firstName": "firstName_value", + "idp_name": "testshib", + "attributes": {"name_id": "1"}, + "session_index": "1", } def test_key_error_from_missing_saml_attributes(self): @@ -207,7 +215,8 @@ def test_key_error_from_missing_saml_attributes(self): """ self.provider = self._configure_testshib_provider() request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) with self.assertRaises(IncorrectConfigurationException): request.backend.auth_complete = MagicMock(return_value=self.fake_auth_complete(strategy)) @@ -226,25 +235,23 @@ class TestShibIntegrationTest(SamlIntegrationTestUtilities, IntegrationTestMixin """ TOKEN_RESPONSE_DATA = { - 'access_token': 'access_token_value', - 'expires_in': 'expires_in_value', + "access_token": "access_token_value", + "expires_in": "expires_in_value", } USER_RESPONSE_DATA = { - 'lastName': 'lastName_value', - 'id': 'id_value', - 'firstName': 'firstName_value', - 'idp_name': 'testshib', - 'attributes': {'urn:oid:0.9.2342.19200300.100.1.1': ['myself'], 'name_id': '1'}, - 'session_index': '1', + "lastName": "lastName_value", + "id": "id_value", + "firstName": "firstName_value", + "idp_name": "testshib", + "attributes": {"urn:oid:0.9.2342.19200300.100.1.1": ["myself"], "name_id": "1"}, + "session_index": "1", } - @patch('openedx.features.enterprise_support.api.enterprise_customer_for_request') - @patch('openedx.core.djangoapps.user_api.accounts.settings_views.enterprise_customer_for_request') - @patch('openedx.features.enterprise_support.utils.third_party_auth.provider.Registry.get') + @patch("openedx.features.enterprise_support.api.enterprise_customer_for_request") + @patch("openedx.features.enterprise_support.utils.third_party_auth.provider.Registry.get") def test_full_pipeline_succeeds_for_unlinking_testshib_account( self, mock_auth_provider, - mock_enterprise_customer_for_request_settings_view, mock_enterprise_customer_for_request, ): @@ -252,10 +259,12 @@ def test_full_pipeline_succeeds_for_unlinking_testshib_account( # configure the backend, and mock out wire traffic. self.provider = self._configure_testshib_provider() request, strategy = self.get_request_and_strategy( - auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete') + auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri="social:complete" + ) request.backend.auth_complete = MagicMock(return_value=self.fake_auth_complete(strategy)) user = self.create_user_models_for_existing_account( - strategy, 'user@example.com', 'password', self.get_username()) + strategy, "user@example.com", "password", self.get_username() + ) self.assert_social_auth_exists_for_user(user, strategy) request.user = user @@ -267,70 +276,67 @@ def test_full_pipeline_succeeds_for_unlinking_testshib_account( enterprise_customer = EnterpriseCustomerFactory() assert EnterpriseCustomerUser.objects.count() == 0, "Precondition check: no link records should exist" EnterpriseCustomerUser.objects.link_user(enterprise_customer, user.email) - assert (EnterpriseCustomerUser.objects - .filter(enterprise_customer=enterprise_customer, user_id=user.id).count() == 1) - EnterpriseCustomerIdentityProvider.objects.get_or_create(enterprise_customer=enterprise_customer, - provider_id=self.provider.provider_id) + assert ( + EnterpriseCustomerUser.objects.filter(enterprise_customer=enterprise_customer, user_id=user.id).count() == 1 + ) + EnterpriseCustomerIdentityProvider.objects.get_or_create( + enterprise_customer=enterprise_customer, provider_id=self.provider.provider_id + ) enterprise_customer_data = { - 'uuid': enterprise_customer.uuid, - 'name': enterprise_customer.name, - 'identity_provider': 'saml-default', - 'identity_providers': [ + "uuid": enterprise_customer.uuid, + "name": enterprise_customer.name, + "identity_provider": "saml-default", + "identity_providers": [ { "provider_id": "saml-default", } ], } - mock_auth_provider.return_value.backend_name = 'tpa-saml' + mock_auth_provider.return_value.backend_name = "tpa-saml" mock_enterprise_customer_for_request.return_value = enterprise_customer_data - mock_enterprise_customer_for_request_settings_view.return_value = enterprise_customer_data # Instrument the pipeline to get to the dashboard with the full expected state. - self.client.get( - pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) + self.client.get(pipeline.get_login_url(self.provider.provider_id, pipeline.AUTH_ENTRY_LOGIN)) - actions.do_complete(request.backend, social_views._do_login, # pylint: disable=protected-access - request=request) + actions.do_complete( + request.backend, social_views._do_login, request=request # pylint: disable=protected-access + ) with self._patch_edxmako_current_request(strategy.request): login_user(strategy.request) - actions.do_complete(request.backend, social_views._do_login, user=user, # pylint: disable=protected-access - request=request) + actions.do_complete( + request.backend, social_views._do_login, user=user, request=request # pylint: disable=protected-access + ) # First we expect that we're in the linked state, with a backend entry. - self.assert_account_settings_context_looks_correct(account_settings_context(request), linked=True) self.assert_social_auth_exists_for_user(request.user, strategy) FEATURES_WITH_ENTERPRISE_ENABLED = settings.FEATURES.copy() - FEATURES_WITH_ENTERPRISE_ENABLED['ENABLE_ENTERPRISE_INTEGRATION'] = True + FEATURES_WITH_ENTERPRISE_ENABLED["ENABLE_ENTERPRISE_INTEGRATION"] = True with patch.dict("django.conf.settings.FEATURES", FEATURES_WITH_ENTERPRISE_ENABLED): # Fire off the disconnect pipeline without the user information. actions.do_disconnect( - request.backend, - None, - None, - redirect_field_name=auth.REDIRECT_FIELD_NAME, - request=request + request.backend, None, None, redirect_field_name=auth.REDIRECT_FIELD_NAME, request=request + ) + assert ( + EnterpriseCustomerUser.objects.filter(enterprise_customer=enterprise_customer, user_id=user.id).count() + != 0 ) - assert EnterpriseCustomerUser.objects\ - .filter(enterprise_customer=enterprise_customer, user_id=user.id).count() != 0 # Fire off the disconnect pipeline to unlink. self.assert_redirect_after_pipeline_completes( actions.do_disconnect( - request.backend, - user, - None, - redirect_field_name=auth.REDIRECT_FIELD_NAME, - request=request + request.backend, user, None, redirect_field_name=auth.REDIRECT_FIELD_NAME, request=request ) ) # Now we expect to be in the unlinked state, with no backend entry. - self.assert_account_settings_context_looks_correct(account_settings_context(request), linked=False) + self.assert_third_party_accounts_state(request, linked=False) self.assert_social_auth_does_not_exist_for_user(user, strategy) - assert EnterpriseCustomerUser.objects\ - .filter(enterprise_customer=enterprise_customer, user_id=user.id).count() == 0 + assert ( + EnterpriseCustomerUser.objects.filter(enterprise_customer=enterprise_customer, user_id=user.id).count() + == 0 + ) def get_response_data(self): """Gets dict (string -> object) of merged data about the user.""" @@ -340,7 +346,7 @@ def get_response_data(self): def get_username(self): response_data = self.get_response_data() - return response_data.get('idp_name') + return response_data.get("idp_name") def test_login_before_metadata_fetched(self): self._configure_testshib_provider(fetch_metadata=False) @@ -350,18 +356,18 @@ def test_login_before_metadata_fetched(self): try_login_response = self.client.get(testshib_login_url) # The user should be redirected to back to the login page: assert try_login_response.status_code == 302 - assert try_login_response['Location'] == self.login_page_url + assert try_login_response["Location"] == self.login_page_url # When loading the login page, the user will see an error message: response = self.client.get(self.login_page_url) - self.assertContains(response, 'Authentication with TestShib is currently unavailable.') + self.assertContains(response, "Authentication with TestShib is currently unavailable.") def test_login(self): - """ Configure TestShib before running the login test """ + """Configure TestShib before running the login test""" self._configure_testshib_provider() self._test_login() def test_register(self): - """ Configure TestShib before running the register test """ + """Configure TestShib before running the register test""" self._configure_testshib_provider() self._test_register() @@ -374,17 +380,17 @@ def test_login_records_attributes(self): user=self.user, provider=self.PROVIDER_BACKEND, uid__startswith=self.PROVIDER_IDP_SLUG ) attributes = record.extra_data - assert attributes.get('urn:oid:1.3.6.1.4.1.5923.1.1.1.9') == ['Member@testshib.org', 'Staff@testshib.org'] - assert attributes.get('urn:oid:2.5.4.3') == ['Me Myself And I'] - assert attributes.get('urn:oid:0.9.2342.19200300.100.1.1') == ['myself'] - assert attributes.get('urn:oid:2.5.4.20') == ['555-5555'] + assert attributes.get("urn:oid:1.3.6.1.4.1.5923.1.1.1.9") == ["Member@testshib.org", "Staff@testshib.org"] + assert attributes.get("urn:oid:2.5.4.3") == ["Me Myself And I"] + assert attributes.get("urn:oid:0.9.2342.19200300.100.1.1") == ["myself"] + assert attributes.get("urn:oid:2.5.4.20") == ["555-5555"] # Phone number @ddt.data(True, False) def test_debug_mode_login(self, debug_mode_enabled): - """ Test SAML login logs with debug mode enabled or not """ + """Test SAML login logs with debug mode enabled or not""" self._configure_testshib_provider(debug_mode=debug_mode_enabled) - with patch.object(saml_log, 'info') as mock_log: + with patch.object(saml_log, "info") as mock_log: self._test_login() if debug_mode_enabled: # We expect that test_login() does two full logins, and each attempt generates two @@ -393,38 +399,37 @@ def test_debug_mode_login(self, debug_mode_enabled): expected_next_url = "/dashboard" (msg, action_type, idp_name, request_data, next_url, xml), _kwargs = mock_log.call_args_list[0] - assert msg.startswith('SAML login %s') - assert action_type == 'request' + assert msg.startswith("SAML login %s") + assert action_type == "request" assert idp_name == self.PROVIDER_IDP_SLUG self.assertDictContainsSubset( - {"idp": idp_name, "auth_entry": "login", "next": expected_next_url}, - request_data + {"idp": idp_name, "auth_entry": "login", "next": expected_next_url}, request_data ) assert next_url == expected_next_url - assert '