Skip to content

Conversation

@GirlBossRush
Copy link
Contributor

@GirlBossRush GirlBossRush commented Nov 2, 2025

Details

This PR removes Spotlight from our package.json. It appears that Sentry now prefers that Spotlight runs as a Electron app or global package:

npx @spotlightjs/spotlight

The Sentry browser package now automatically includes an integration that detects when the Spotlight app is running. This lets us optionally support Spotlight without an additional package while keeping our debugging docs as is 😎

@GirlBossRush GirlBossRush added the area:frontend Features or issues related to the browser, TypeScript, Node.js, etc label Nov 2, 2025
@GirlBossRush GirlBossRush requested a review from a team as a code owner November 2, 2025 20:44
@netlify
Copy link

netlify bot commented Nov 2, 2025

Deploy Preview for authentik-storybook ready!

Name Link
🔨 Latest commit 5fce6b4
🔍 Latest deploy log https://app.netlify.com/projects/authentik-storybook/deploys/6907c2cc54d4aa000811606b
😎 Deploy Preview https://deploy-preview-17904--authentik-storybook.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@GirlBossRush GirlBossRush requested a review from BeryJu November 2, 2025 20:45
@netlify
Copy link

netlify bot commented Nov 2, 2025

Deploy Preview for authentik-integrations canceled.

Name Link
🔨 Latest commit 5fce6b4
🔍 Latest deploy log https://app.netlify.com/projects/authentik-integrations/deploys/6907c2cc525eaf0008bc4ed4

@GirlBossRush GirlBossRush moved this from Todo to Needs review in authentik Core Nov 2, 2025
@netlify
Copy link

netlify bot commented Nov 2, 2025

Deploy Preview for authentik-docs ready!

Name Link
🔨 Latest commit 5fce6b4
🔍 Latest deploy log https://app.netlify.com/projects/authentik-docs/deploys/6907c2cc3011c0000747bc9f
😎 Deploy Preview https://deploy-preview-17904--authentik-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link

codecov bot commented Nov 2, 2025

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
2195 3 2192 2
View the top 1 failed test(s) by shortest run time
tests.e2e.test_provider_saml.TestProviderSAML::test_sp_initiated_implicit_post_buffer
Stack Traces | 22s run time
self = <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>
sql = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f098ec644d0>
args = ('TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_st...k_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f098ec644d0>
query = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.DeadlockDetected: deadlock detected
E           DETAIL:  Process 490 waits for AccessExclusiveLock on relation 18241 of database 16389; blocked by process 584.
E           Process 584 waits for RowShareLock on relation 16399 of database 16389; blocked by process 490.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: DeadlockDetected

The above exception was the direct cause of the following exception:

self = <django.core.management.commands.flush.Command object at 0x7f098db0af50>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
>                   connection.ops.execute_sql_flush(sql_list)

.venv/lib/python3.13.../management/commands/flush.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psqlextra.backend.operations.PostgresOperations object at 0x7f09961cbe00>
sql_list = ['TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_st...ik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";']

    def execute_sql_flush(self, sql_list):
        """Execute a list of SQL statements to flush the database."""
        with transaction.atomic(
            using=self.connection.alias,
            savepoint=self.connection.features.can_rollback_ddl,
        ):
            with self.connection.cursor() as cursor:
                for sql in sql_list:
>                   cursor.execute(sql)

.venv/lib/python3.13.../backends/base/operations.py:473: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>, 'TRUNCATE "authentik_stages_authenticator_endpoint...ik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";')
kwargs = {}

    def runner(*args: "P.args", **kwargs: "P.kwargs"):
        # type: (...) -> R
        if sentry_sdk.get_client().get_integration(integration) is None:
            return original_function(*args, **kwargs)
    
>       return sentry_patched_function(*args, **kwargs)

.venv/lib/python3.13.../site-packages/sentry_sdk/utils.py:1816: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>
sql = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None

    @ensure_integration_enabled(DjangoIntegration, real_execute)
    def execute(self, sql, params=None):
        # type: (CursorWrapper, Any, Optional[Any]) -> Any
        with record_sql_queries(
            cursor=self.cursor,
            query=sql,
            params_list=params,
            paramstyle="format",
            executemany=False,
            span_origin=DjangoIntegration.origin_db,
        ) as span:
            _set_db_data(span, self)
>           result = real_execute(self, sql, params)

.venv/lib/python3.13.../integrations/django/__init__.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>
sql = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None

    def execute(self, sql, params=None):
>       return self._execute_with_wrappers(
            sql, params, many=False, executor=self._execute
        )

.venv/lib/python3.13.../db/backends/utils.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>
sql = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None, many = False
executor = <bound method CursorWrapper._execute of <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>>

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {"connection": self.db, "cursor": self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
>       return executor(sql, params, many, context)

.venv/lib/python3.13.../db/backends/utils.py:92: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>
sql = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
>       with self.db.wrap_database_errors:

.venv/lib/python3.13.../db/backends/utils.py:100: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.utils.DatabaseErrorWrapper object at 0x7f09925d9fd0>
exc_type = <class 'psycopg.errors.DeadlockDetected'>
exc_value = DeadlockDetected('deadlock detected\nDETAIL:  Process 490 waits for AccessExclusiveLock on relation 18241 of database ...r RowShareLock on relation 16399 of database 16389; blocked by process 490.\nHINT:  See server log for query details.')
traceback = <traceback object at 0x7f0974e0a380>

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is None:
            return
        for dj_exc_type in (
            DataError,
            OperationalError,
            IntegrityError,
            InternalError,
            ProgrammingError,
            NotSupportedError,
            DatabaseError,
            InterfaceError,
            Error,
        ):
            db_exc_type = getattr(self.wrapper.Database, dj_exc_type.__name__)
            if issubclass(exc_type, db_exc_type):
                dj_exc_value = dj_exc_type(*exc_value.args)
                # Only set the 'errors_occurred' flag for errors that may make
                # the connection unusable.
                if dj_exc_type not in (DataError, IntegrityError):
                    self.wrapper.errors_occurred = True
>               raise dj_exc_value.with_traceback(traceback) from exc_value

.venv/lib/python3.13.../django/db/utils.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>
sql = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None
ignored_wrapper_args = (False, {'connection': <DatabaseWrapper vendor='postgresql' alias='default'>, 'cursor': <django.db.backends.utils.CursorWrapper object at 0x7f09841285f0>})

    def _execute(self, sql, params, *ignored_wrapper_args):
        # Raise a warning during app initialization (stored_app_configs is only
        # ever set during testing).
        if not apps.ready and not apps.stored_app_configs:
            warnings.warn(self.APPS_NOT_READY_WARNING_MSG, category=RuntimeWarning)
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
>               return self.cursor.execute(sql)

.venv/lib/python3.13.../db/backends/utils.py:103: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f098ec644d0>
args = ('TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_st...k_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";',)
kwargs = {}

    def execute(self, *args, **kwargs):
        execute_total.labels(alias, vendor).inc()
        with (
            query_duration_seconds.labels(**labels).time(),
            ExceptionCounterByType(errors_total, extra_labels=labels),
        ):
>           return super().execute(*args, **kwargs)

.venv/lib/python3.13.../django_prometheus/db/common.py:69: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django_prometheus.db.common.ExportingCursorWrapper.<locals>.CursorWrapper [closed] [IDLE] (host=localhost user=authentik database=test_authentik) at 0x7f098ec644d0>
query = 'TRUNCATE "authentik_stages_authenticator_endpoint_gdtc_endpointdevice42dc", "authentik_core_provider", "authentik_sta...tik_providers_saml_samlprovider", "authentik_stages_deny_denystage", "authentik_policies_reputation_reputationpolicy";'
params = None

    def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            with self._conn.lock:
                self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           django.db.utils.OperationalError: deadlock detected
E           DETAIL:  Process 490 waits for AccessExclusiveLock on relation 18241 of database 16389; blocked by process 584.
E           Process 584 waits for RowShareLock on relation 16399 of database 16389; blocked by process 490.
E           HINT:  See server log for query details.

.venv/lib/python3.13....../site-packages/psycopg/cursor.py:97: OperationalError

The above exception was the direct cause of the following exception:

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>
result = <TestCaseFunction test_sp_initiated_implicit_post_buffer>
debug = False

    def _setup_and_call(self, result, debug=False):
        """
        Perform the following in order: pre-setup, run test, post-teardown,
        skipping pre/post hooks if test is set to be skipped.
    
        If debug=True, reraise any errors in setup and use super().debug()
        instead of __call__() to run the test.
        """
        testMethod = getattr(self, self._testMethodName)
        skipped = getattr(self.__class__, "__unittest_skip__", False) or getattr(
            testMethod, "__unittest_skip__", False
        )
    
        # Convert async test methods.
        if iscoroutinefunction(testMethod):
            setattr(self, self._testMethodName, async_to_sync(testMethod))
    
        if not skipped:
            try:
                if self.__class__._pre_setup_ran_eagerly:
                    self.__class__._pre_setup_ran_eagerly = False
                else:
                    self._pre_setup()
            except Exception:
                if debug:
                    raise
                result.addError(self, sys.exc_info())
                return
        if debug:
            super().debug()
        else:
            super().__call__(result)
        if not skipped:
            try:
>               self._post_teardown()

.venv/lib/python3.13.../django/test/testcases.py:379: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>

    def _post_teardown(self):
        """
        Perform post-test things:
        * Flush the contents of the database to leave a clean slate. If the
          class has an 'available_apps' attribute, don't fire post_migrate.
        * Force-close the connection so the next test gets a clean cursor.
        """
        try:
>           self._fixture_teardown()

.venv/lib/python3.13.../django/test/testcases.py:1231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_saml.TestProviderSAML testMethod=test_sp_initiated_implicit_post_buffer>

    def _fixture_teardown(self):
        # Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal
        # when flushing only a subset of the apps
        for db_name in self._databases_names(include_mirrors=False):
            # Flush the database
            inhibit_post_migrate = (
                self.available_apps is not None
                or (  # Inhibit the post_migrate signal when using serialized
                    # rollback to avoid trying to recreate the serialized data.
                    self.serialized_rollback
                    and hasattr(connections[db_name], "_test_serialized_contents")
                )
            )
>           call_command(
                "flush",
                verbosity=0,
                interactive=False,
                database=db_name,
                reset_sequences=False,
                allow_cascade=self.available_apps is not None,
                inhibit_post_migrate=inhibit_post_migrate,
            )

.venv/lib/python3.13.../django/test/testcases.py:1266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

command_name = 'flush', args = ()
options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
command = <django.core.management.commands.flush.Command object at 0x7f098db0af50>
app_name = 'django.core'
parser = CommandParser(prog=' flush', usage=None, description='Removes ALL DATA from the database, including data added during ....', formatter_class=<class 'django.core.management.base.DjangoHelpFormatter'>, conflict_handler='error', add_help=True)
opt_mapping = {'database': 'database', 'force_color': 'force_color', 'help': 'help', 'no_color': 'no_color', ...}
arg_options = {'allow_cascade': False, 'database': 'default', 'inhibit_post_migrate': False, 'interactive': False, ...}
parse_args = []

    def call_command(command_name, *args, **options):
        """
        Call the given command, with the given options and args/kwargs.
    
        This is the primary API you should use for calling specific commands.
    
        `command_name` may be a string or a command object. Using a string is
        preferred unless the command object is required for further processing or
        testing.
    
        Some examples:
            call_command('migrate')
            call_command('shell', plain=True)
            call_command('sqlmigrate', 'myapp')
    
            from django.core.management.commands import flush
            cmd = flush.Command()
            call_command(cmd, verbosity=0, interactive=False)
            # Do something with cmd ...
        """
        if isinstance(command_name, BaseCommand):
            # Command object passed in.
            command = command_name
            command_name = command.__class__.__module__.split(".")[-1]
        else:
            # Load the command object by name.
            try:
                app_name = get_commands()[command_name]
            except KeyError:
                raise CommandError("Unknown command: %r" % command_name)
    
            if isinstance(app_name, BaseCommand):
                # If the command is already loaded, use it directly.
                command = app_name
            else:
                command = load_command_class(app_name, command_name)
    
        # Simulate argument parsing to get the option defaults (see #10080 for details).
        parser = command.create_parser("", command_name)
        # Use the `dest` option name from the parser option
        opt_mapping = {
            min(s_opt.option_strings).lstrip("-").replace("-", "_"): s_opt.dest
            for s_opt in parser._actions
            if s_opt.option_strings
        }
        arg_options = {opt_mapping.get(key, key): value for key, value in options.items()}
        parse_args = []
        for arg in args:
            if isinstance(arg, (list, tuple)):
                parse_args += map(str, arg)
            else:
                parse_args.append(str(arg))
    
        def get_actions(parser):
            # Parser actions and actions from sub-parser choices.
            for opt in parser._actions:
                if isinstance(opt, _SubParsersAction):
                    for sub_opt in opt.choices.values():
                        yield from get_actions(sub_opt)
                else:
                    yield opt
    
        parser_actions = list(get_actions(parser))
        mutually_exclusive_required_options = {
            opt
            for group in parser._mutually_exclusive_groups
            for opt in group._group_actions
            if group.required
        }
        # Any required arguments which are passed in via **options must be passed
        # to parse_args().
        for opt in parser_actions:
            if opt.dest in options and (
                opt.required or opt in mutually_exclusive_required_options
            ):
                opt_dest_count = sum(v == opt.dest for v in opt_mapping.values())
                if opt_dest_count > 1:
                    raise TypeError(
                        f"Cannot pass the dest {opt.dest!r} that matches multiple "
                        f"arguments via **options."
                    )
                parse_args.append(min(opt.option_strings))
                if isinstance(opt, (_AppendConstAction, _CountAction, _StoreConstAction)):
                    continue
                value = arg_options[opt.dest]
                if isinstance(value, (list, tuple)):
                    parse_args += map(str, value)
                else:
                    parse_args.append(str(value))
        defaults = parser.parse_args(args=parse_args)
        defaults = dict(defaults._get_kwargs(), **arg_options)
        # Raise an error if any unknown options were passed.
        stealth_options = set(command.base_stealth_options + command.stealth_options)
        dest_parameters = {action.dest for action in parser_actions}
        valid_options = (dest_parameters | stealth_options).union(opt_mapping)
        unknown_options = set(options) - valid_options
        if unknown_options:
            raise TypeError(
                "Unknown option(s) for %s command: %s. "
                "Valid options are: %s."
                % (
                    command_name,
                    ", ".join(sorted(unknown_options)),
                    ", ".join(sorted(valid_options)),
                )
            )
        # Move positional args out of options to mimic legacy optparse
        args = defaults.pop("args", ())
        if "skip_checks" not in options:
            defaults["skip_checks"] = True
    
>       return command.execute(*args, **defaults)

.venv/lib/python3.13.../core/management/__init__.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f098db0af50>
args = ()
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}

    def execute(self, *args, **options):
        """
        Try to execute this command, performing system checks if needed (as
        controlled by the ``requires_system_checks`` attribute, except if
        force-skipped).
        """
        if options["force_color"] and options["no_color"]:
            raise CommandError(
                "The --no-color and --force-color options can't be used together."
            )
        if options["force_color"]:
            self.style = color_style(force_color=True)
        elif options["no_color"]:
            self.style = no_style()
            self.stderr.style_func = None
        if options.get("stdout"):
            self.stdout = OutputWrapper(options["stdout"])
        if options.get("stderr"):
            self.stderr = OutputWrapper(options["stderr"])
    
        if self.requires_system_checks and not options["skip_checks"]:
            check_kwargs = self.get_check_kwargs(options)
            self.check(**check_kwargs)
        if self.requires_migrations_checks:
            self.check_migrations()
>       output = self.handle(*args, **options)

.venv/lib/python3.13.../core/management/base.py:460: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <django.core.management.commands.flush.Command object at 0x7f098db0af50>
options = {'allow_cascade': False, 'database': 'default', 'force_color': False, 'inhibit_post_migrate': False, ...}
database = 'default'
connection = <DatabaseWrapper vendor='postgresql' alias='default'>
verbosity = 0, interactive = False, reset_sequences = False
allow_cascade = False, inhibit_post_migrate = False

        def handle(self, **options):
            database = options["database"]
            connection = connections[database]
            verbosity = options["verbosity"]
            interactive = options["interactive"]
            # The following are stealth options used by Django's internals.
            reset_sequences = options.get("reset_sequences", True)
            allow_cascade = options.get("allow_cascade", False)
            inhibit_post_migrate = options.get("inhibit_post_migrate", False)
    
            self.style = no_style()
    
            # Import the 'management' module within each installed app, to register
            # dispatcher events.
            for app_config in apps.get_app_configs():
                try:
                    import_module(".management", app_config.name)
                except ImportError:
                    pass
    
            sql_list = sql_flush(
                self.style,
                connection,
                reset_sequences=reset_sequences,
                allow_cascade=allow_cascade,
            )
    
            if interactive:
                confirm = input(
                    """You have requested a flush of the database.
    This will IRREVERSIBLY DESTROY all data currently in the "%s" database,
    and return each table to an empty state.
    Are you sure you want to do this?
    
        Type 'yes' to continue, or 'no' to cancel: """
                    % connection.settings_dict["NAME"]
                )
            else:
                confirm = "yes"
    
            if confirm == "yes":
                try:
                    connection.ops.execute_sql_flush(sql_list)
                except Exception as exc:
>                   raise CommandError(
                        "Database %s couldn't be flushed. Possible reasons:\n"
                        "  * The database isn't running or isn't configured correctly.\n"
                        "  * At least one of the expected database tables doesn't exist.\n"
                        "  * The SQL was invalid.\n"
                        "Hint: Look at the output of 'django-admin sqlflush'. "
                        "That's the SQL this command wasn't able to run."
                        % (connection.settings_dict["NAME"],)
                    ) from exc
E                   django.core.management.base.CommandError: Database test_authentik couldn't be flushed. Possible reasons:
E                     * The database isn't running or isn't configured correctly.
E                     * At least one of the expected database tables doesn't exist.
E                     * The SQL was invalid.
E                   Hint: Look at the output of 'django-admin sqlflush'. That's the SQL this command wasn't able to run.

.venv/lib/python3.13.../management/commands/flush.py:76: CommandError
View the full list of 2 ❄️ flaky test(s)
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_fail

Flake rate in main: 100.00% (Passed 0 times, Failed 57 times)

Stack Traces | 95s run time
self = <asgiref.sync.AsyncToSync object at 0x7f5738482a50>
call_result = <Future at 0x7f57451a4c50 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f573d37e740>)
task_context = None, context = [<_contextvars.Context object at 0x7f573d9427c0>]
awaitable = <coroutine object _async_proxy at 0x7f57383aee30>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.sync.AsyncToSync object at 0x7f573ca00850>
call_result = <Future at 0x7f573c9c8450 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f573db0aa40>)
task_context = None, context = [<_contextvars.Context object at 0x7f57382a0380>]
awaitable = <coroutine object _async_proxy at 0x7f573cf4cc70>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_fail(self):
        """Test simple bind (failed)"""
        self._prepare()
        srv = Client(
            server="localhost",
            secret=self.shared_secret.encode(),
            dict=Dictionary(".../radius/dictionaries/dictionary"),
        )
    
        req = srv.CreateAuthPacket(
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
        )
        req["User-Password"] = req.PwCrypt(self.user.username + "foo")
    
>       reply = srv.SendPacket(req)

tests/e2e/test_provider_radius.py:98: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573e245810>
pkt = AuthPacket({'User-Name': ['9OleamrlOg197U2IEutX'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'LGY\x19\xbfhj\xe9\xdd\xcf\x87\x81\x0f\xe7\xe1=1\xbf\x11\x00\x18.%\xe3\xb5\xd8\xff\xe74\xa0E\x1e']})

    def SendPacket(self, pkt):
        """Send a packet to a RADIUS server.
    
        :param pkt: the packet to send
        :type pkt:  pyrad.packet.Packet
        :return:    the reply packet received
        :rtype:     pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        if isinstance(pkt, packet.AuthPacket):
            if pkt.auth_type == 'eap-md5':
                # Creating EAP-Identity
                password = pkt[2][0] if 2 in pkt else pkt[1][0]
                pkt[79] = [struct.pack('!BBHB%ds' % len(password),
                                       EAP_CODE_RESPONSE,
                                       packet.CurrentID,
                                       len(password) + 5,
                                       EAP_TYPE_IDENTITY,
                                       password)]
>           reply = self._SendPacket(pkt, self.authport)

.venv/lib/python3.13................../site-packages/pyrad/client.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573e245810>
pkt = AuthPacket({'User-Name': ['9OleamrlOg197U2IEutX'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'LGY\x19\xbfhj\xe9\xdd\xcf\x87\x81\x0f\xe7\xe1=1\xbf\x11\x00\x18.%\xe3\xb5\xd8\xff\xe74\xa0E\x1e']})
port = 1812

    def _SendPacket(self, pkt, port):
        """Send a packet to a RADIUS server.
    
        :param pkt:  the packet to send
        :type pkt:   pyrad.packet.Packet
        :param port: UDP port to send packet to
        :type port:  integer
        :return:     the reply packet received
        :rtype:      pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        self._SocketOpen()
    
        for attempt in range(self.retries):
            if attempt and pkt.code == packet.AccountingRequest:
                if "Acct-Delay-Time" in pkt:
                    pkt["Acct-Delay-Time"] = \
                            pkt["Acct-Delay-Time"][0] + self.timeout
                else:
                    pkt["Acct-Delay-Time"] = self.timeout
    
            now = time.time()
            waitto = now + self.timeout
    
            self._socket.sendto(pkt.RequestPacket(), (self.server, port))
    
            while now < waitto:
                ready = self._poll.poll((waitto - now) * 1000)
    
                if ready:
                    rawreply = self._socket.recv(4096)
                else:
                    now = time.time()
                    continue
    
                try:
                    reply = pkt.CreateReply(packet=rawreply)
                    if pkt.VerifyReply(reply, rawreply):
                        return reply
                except packet.PacketError:
                    pass
    
                now = time.time()
    
>       raise Timeout
E       pyrad.client.Timeout

.venv/lib/python3.13................../site-packages/pyrad/client.py:173: Timeout

During handling of the above exception, another exception occurred:

self = <asgiref.sync.AsyncToSync object at 0x7f57382595e0>
call_result = <Future at 0x7f573d3b0050 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f573ceb8c80>)
task_context = None, context = [<_contextvars.Context object at 0x7f57383f6a00>]
awaitable = <coroutine object _async_proxy at 0x7f573d7de980>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.sync.AsyncToSync object at 0x7f573d3577a0>
call_result = <Future at 0x7f573e863350 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f573da47580>)
task_context = None, context = [<_contextvars.Context object at 0x7f573d293600>]
awaitable = <coroutine object _async_proxy at 0x7f573d354f40>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_fail(self):
        """Test simple bind (failed)"""
        self._prepare()
        srv = Client(
            server="localhost",
            secret=self.shared_secret.encode(),
            dict=Dictionary(".../radius/dictionaries/dictionary"),
        )
    
        req = srv.CreateAuthPacket(
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
        )
        req["User-Password"] = req.PwCrypt(self.user.username + "foo")
    
>       reply = srv.SendPacket(req)

tests/e2e/test_provider_radius.py:98: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573d778640>
pkt = AuthPacket({'User-Name': ['VBpUWCRCS4GYsxxmKBOd'], 'NAS-Identifier': ['localhost'], 'User-Password': [b"'\xe1;3p\xc9<Lg\xf6\xc5\x1d{\xa3\x17\xe3]\xb0\xe6j\xb7L0\xf5\xab\xc3\x82\x01E;tC"]})

    def SendPacket(self, pkt):
        """Send a packet to a RADIUS server.
    
        :param pkt: the packet to send
        :type pkt:  pyrad.packet.Packet
        :return:    the reply packet received
        :rtype:     pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        if isinstance(pkt, packet.AuthPacket):
            if pkt.auth_type == 'eap-md5':
                # Creating EAP-Identity
                password = pkt[2][0] if 2 in pkt else pkt[1][0]
                pkt[79] = [struct.pack('!BBHB%ds' % len(password),
                                       EAP_CODE_RESPONSE,
                                       packet.CurrentID,
                                       len(password) + 5,
                                       EAP_TYPE_IDENTITY,
                                       password)]
>           reply = self._SendPacket(pkt, self.authport)

.venv/lib/python3.13................../site-packages/pyrad/client.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573d778640>
pkt = AuthPacket({'User-Name': ['VBpUWCRCS4GYsxxmKBOd'], 'NAS-Identifier': ['localhost'], 'User-Password': [b"'\xe1;3p\xc9<Lg\xf6\xc5\x1d{\xa3\x17\xe3]\xb0\xe6j\xb7L0\xf5\xab\xc3\x82\x01E;tC"]})
port = 1812

    def _SendPacket(self, pkt, port):
        """Send a packet to a RADIUS server.
    
        :param pkt:  the packet to send
        :type pkt:   pyrad.packet.Packet
        :param port: UDP port to send packet to
        :type port:  integer
        :return:     the reply packet received
        :rtype:      pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        self._SocketOpen()
    
        for attempt in range(self.retries):
            if attempt and pkt.code == packet.AccountingRequest:
                if "Acct-Delay-Time" in pkt:
                    pkt["Acct-Delay-Time"] = \
                            pkt["Acct-Delay-Time"][0] + self.timeout
                else:
                    pkt["Acct-Delay-Time"] = self.timeout
    
            now = time.time()
            waitto = now + self.timeout
    
            self._socket.sendto(pkt.RequestPacket(), (self.server, port))
    
            while now < waitto:
                ready = self._poll.poll((waitto - now) * 1000)
    
                if ready:
                    rawreply = self._socket.recv(4096)
                else:
                    now = time.time()
                    continue
    
                try:
                    reply = pkt.CreateReply(packet=rawreply)
                    if pkt.VerifyReply(reply, rawreply):
                        return reply
                except packet.PacketError:
                    pass
    
                now = time.time()
    
>       raise Timeout
E       pyrad.client.Timeout

.venv/lib/python3.13................../site-packages/pyrad/client.py:173: Timeout

During handling of the above exception, another exception occurred:

self = <unittest.case._Outcome object at 0x7f573d776990>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.9........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
result = <TestCaseFunction test_radius_bind_fail>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.9........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
method = <bound method TestProviderRadius.test_radius_bind_fail of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.9........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:337: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:337: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
>               raise exc

tests/e2e/utils.py:331: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_fail>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_fail(self):
        """Test simple bind (failed)"""
        self._prepare()
        srv = Client(
            server="localhost",
            secret=self.shared_secret.encode(),
            dict=Dictionary(".../radius/dictionaries/dictionary"),
        )
    
        req = srv.CreateAuthPacket(
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
        )
        req["User-Password"] = req.PwCrypt(self.user.username + "foo")
    
>       reply = srv.SendPacket(req)

tests/e2e/test_provider_radius.py:98: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573d8b6f90>
pkt = AuthPacket({'User-Name': ['iVtFZtGD60ToaJqyw7Gn'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'$E?Vq\xd3\x8cI\xbc\x1fx\x07\xec?\x9b\xfb*y-\xba`\xa1g\r\x0fC\xaf\x99#q[\x8b']})

    def SendPacket(self, pkt):
        """Send a packet to a RADIUS server.
    
        :param pkt: the packet to send
        :type pkt:  pyrad.packet.Packet
        :return:    the reply packet received
        :rtype:     pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        if isinstance(pkt, packet.AuthPacket):
            if pkt.auth_type == 'eap-md5':
                # Creating EAP-Identity
                password = pkt[2][0] if 2 in pkt else pkt[1][0]
                pkt[79] = [struct.pack('!BBHB%ds' % len(password),
                                       EAP_CODE_RESPONSE,
                                       packet.CurrentID,
                                       len(password) + 5,
                                       EAP_TYPE_IDENTITY,
                                       password)]
>           reply = self._SendPacket(pkt, self.authport)

.venv/lib/python3.13................../site-packages/pyrad/client.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573d8b6f90>
pkt = AuthPacket({'User-Name': ['iVtFZtGD60ToaJqyw7Gn'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'$E?Vq\xd3\x8cI\xbc\x1fx\x07\xec?\x9b\xfb*y-\xba`\xa1g\r\x0fC\xaf\x99#q[\x8b']})
port = 1812

    def _SendPacket(self, pkt, port):
        """Send a packet to a RADIUS server.
    
        :param pkt:  the packet to send
        :type pkt:   pyrad.packet.Packet
        :param port: UDP port to send packet to
        :type port:  integer
        :return:     the reply packet received
        :rtype:      pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        self._SocketOpen()
    
        for attempt in range(self.retries):
            if attempt and pkt.code == packet.AccountingRequest:
                if "Acct-Delay-Time" in pkt:
                    pkt["Acct-Delay-Time"] = \
                            pkt["Acct-Delay-Time"][0] + self.timeout
                else:
                    pkt["Acct-Delay-Time"] = self.timeout
    
            now = time.time()
            waitto = now + self.timeout
    
            self._socket.sendto(pkt.RequestPacket(), (self.server, port))
    
            while now < waitto:
                ready = self._poll.poll((waitto - now) * 1000)
    
                if ready:
                    rawreply = self._socket.recv(4096)
                else:
                    now = time.time()
                    continue
    
                try:
                    reply = pkt.CreateReply(packet=rawreply)
                    if pkt.VerifyReply(reply, rawreply):
                        return reply
                except packet.PacketError:
                    pass
    
                now = time.time()
    
>       raise Timeout
E       pyrad.client.Timeout

.venv/lib/python3.13................../site-packages/pyrad/client.py:173: Timeout
tests.e2e.test_provider_radius.TestProviderRadius::test_radius_bind_success

Flake rate in main: 100.00% (Passed 0 times, Failed 57 times)

Stack Traces | 177s run time
self = <asgiref.sync.AsyncToSync object at 0x7f573dab0640>
call_result = <Future at 0x7f573e2df650 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f57383d9d80>)
task_context = None, context = [<_contextvars.Context object at 0x7f573d6e0580>]
awaitable = <coroutine object _async_proxy at 0x7f573823ed40>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.sync.AsyncToSync object at 0x7f5738377ed0>
call_result = <Future at 0x7f5744fa6cf0 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f5738476880>)
task_context = None, context = [<_contextvars.Context object at 0x7f573ca60080>]
awaitable = <coroutine object _async_proxy at 0x7f573823d8a0>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_success(self):
        """Test simple bind"""
        self._prepare()
        srv = Client(
            server="localhost",
            secret=self.shared_secret.encode(),
            dict=Dictionary(".../radius/dictionaries/dictionary"),
        )
    
        req = srv.CreateAuthPacket(
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
        )
        req["User-Password"] = req.PwCrypt(self.user.username)
    
>       reply = srv.SendPacket(req)

tests/e2e/test_provider_radius.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573846d940>
pkt = AuthPacket({'User-Name': ['t8ugEZTb8UtDVX7lSpZd'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'\xc5\x8bl\tSN)\xc9lH#\x1f\x04\xf4^\xcd.\x07\xa0m}\xe8SG\x111w\xcf\x9a\x15\xde\x13']})

    def SendPacket(self, pkt):
        """Send a packet to a RADIUS server.
    
        :param pkt: the packet to send
        :type pkt:  pyrad.packet.Packet
        :return:    the reply packet received
        :rtype:     pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        if isinstance(pkt, packet.AuthPacket):
            if pkt.auth_type == 'eap-md5':
                # Creating EAP-Identity
                password = pkt[2][0] if 2 in pkt else pkt[1][0]
                pkt[79] = [struct.pack('!BBHB%ds' % len(password),
                                       EAP_CODE_RESPONSE,
                                       packet.CurrentID,
                                       len(password) + 5,
                                       EAP_TYPE_IDENTITY,
                                       password)]
>           reply = self._SendPacket(pkt, self.authport)

.venv/lib/python3.13................../site-packages/pyrad/client.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573846d940>
pkt = AuthPacket({'User-Name': ['t8ugEZTb8UtDVX7lSpZd'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'\xc5\x8bl\tSN)\xc9lH#\x1f\x04\xf4^\xcd.\x07\xa0m}\xe8SG\x111w\xcf\x9a\x15\xde\x13']})
port = 1812

    def _SendPacket(self, pkt, port):
        """Send a packet to a RADIUS server.
    
        :param pkt:  the packet to send
        :type pkt:   pyrad.packet.Packet
        :param port: UDP port to send packet to
        :type port:  integer
        :return:     the reply packet received
        :rtype:      pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        self._SocketOpen()
    
        for attempt in range(self.retries):
            if attempt and pkt.code == packet.AccountingRequest:
                if "Acct-Delay-Time" in pkt:
                    pkt["Acct-Delay-Time"] = \
                            pkt["Acct-Delay-Time"][0] + self.timeout
                else:
                    pkt["Acct-Delay-Time"] = self.timeout
    
            now = time.time()
            waitto = now + self.timeout
    
            self._socket.sendto(pkt.RequestPacket(), (self.server, port))
    
            while now < waitto:
                ready = self._poll.poll((waitto - now) * 1000)
    
                if ready:
                    rawreply = self._socket.recv(4096)
                else:
                    now = time.time()
                    continue
    
                try:
                    reply = pkt.CreateReply(packet=rawreply)
                    if pkt.VerifyReply(reply, rawreply):
                        return reply
                except packet.PacketError:
                    pass
    
                now = time.time()
    
>       raise Timeout
E       pyrad.client.Timeout

.venv/lib/python3.13................../site-packages/pyrad/client.py:173: Timeout

During handling of the above exception, another exception occurred:

self = <asgiref.sync.AsyncToSync object at 0x7f573d119a30>
call_result = <Future at 0x7f573dac3290 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f57383dd4c0>)
task_context = None, context = [<_contextvars.Context object at 0x7f573cfc4600>]
awaitable = <coroutine object _async_proxy at 0x7f573825b010>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asgiref.sync.AsyncToSync object at 0x7f573e494c30>
call_result = <Future at 0x7f573ccc2cf0 state=finished returned NoneType>
exc_info = (<class 'pyrad.client.Timeout'>, Timeout(), <traceback object at 0x7f5744dc46c0>)
task_context = None, context = [<_contextvars.Context object at 0x7f57382a20c0>]
awaitable = <coroutine object _async_proxy at 0x7f573823c5e0>

    async def main_wrap(
        self,
        call_result: "Future[_R]",
        exc_info: "OptExcInfo",
        task_context: "Optional[List[asyncio.Task[Any]]]",
        context: List[contextvars.Context],
        awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
    ) -> None:
        """
        Wraps the awaitable with something that puts the result into the
        result/exception future.
        """
    
        __traceback_hide__ = True  # noqa: F841
    
        if context is not None:
            _restore_context(context[0])
    
        current_task = asyncio.current_task()
        if current_task is not None and task_context is not None:
            task_context.append(current_task)
    
        try:
            # If we have an exception, run the function inside the except block
            # after raising it so exc_info is correctly populated.
            if exc_info[1]:
                try:
>                   raise exc_info[1]

.venv/lib/python3.13............/site-packages/asgiref/sync.py:361: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_success(self):
        """Test simple bind"""
        self._prepare()
        srv = Client(
            server="localhost",
            secret=self.shared_secret.encode(),
            dict=Dictionary(".../radius/dictionaries/dictionary"),
        )
    
        req = srv.CreateAuthPacket(
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
        )
        req["User-Password"] = req.PwCrypt(self.user.username)
    
>       reply = srv.SendPacket(req)

tests/e2e/test_provider_radius.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573d684190>
pkt = AuthPacket({'User-Name': ['sUdoVZIf1ER70A45VzbQ'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'\r\x84b\x12\xf8\x1aK\xbc!\x90/\xfd\xce\xd4\xb4\xf8\x8f\x07\x0b@(\xd0\xba\xdeiP0!\xc6\xd9\xd9\x8e']})

    def SendPacket(self, pkt):
        """Send a packet to a RADIUS server.
    
        :param pkt: the packet to send
        :type pkt:  pyrad.packet.Packet
        :return:    the reply packet received
        :rtype:     pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        if isinstance(pkt, packet.AuthPacket):
            if pkt.auth_type == 'eap-md5':
                # Creating EAP-Identity
                password = pkt[2][0] if 2 in pkt else pkt[1][0]
                pkt[79] = [struct.pack('!BBHB%ds' % len(password),
                                       EAP_CODE_RESPONSE,
                                       packet.CurrentID,
                                       len(password) + 5,
                                       EAP_TYPE_IDENTITY,
                                       password)]
>           reply = self._SendPacket(pkt, self.authport)

.venv/lib/python3.13................../site-packages/pyrad/client.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573d684190>
pkt = AuthPacket({'User-Name': ['sUdoVZIf1ER70A45VzbQ'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'\r\x84b\x12\xf8\x1aK\xbc!\x90/\xfd\xce\xd4\xb4\xf8\x8f\x07\x0b@(\xd0\xba\xdeiP0!\xc6\xd9\xd9\x8e']})
port = 1812

    def _SendPacket(self, pkt, port):
        """Send a packet to a RADIUS server.
    
        :param pkt:  the packet to send
        :type pkt:   pyrad.packet.Packet
        :param port: UDP port to send packet to
        :type port:  integer
        :return:     the reply packet received
        :rtype:      pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        self._SocketOpen()
    
        for attempt in range(self.retries):
            if attempt and pkt.code == packet.AccountingRequest:
                if "Acct-Delay-Time" in pkt:
                    pkt["Acct-Delay-Time"] = \
                            pkt["Acct-Delay-Time"][0] + self.timeout
                else:
                    pkt["Acct-Delay-Time"] = self.timeout
    
            now = time.time()
            waitto = now + self.timeout
    
            self._socket.sendto(pkt.RequestPacket(), (self.server, port))
    
            while now < waitto:
                ready = self._poll.poll((waitto - now) * 1000)
    
                if ready:
                    rawreply = self._socket.recv(4096)
                else:
                    now = time.time()
                    continue
    
                try:
                    reply = pkt.CreateReply(packet=rawreply)
                    if pkt.VerifyReply(reply, rawreply):
                        return reply
                except packet.PacketError:
                    pass
    
                now = time.time()
    
>       raise Timeout
E       pyrad.client.Timeout

.venv/lib/python3.13................../site-packages/pyrad/client.py:173: Timeout

During handling of the above exception, another exception occurred:

self = <unittest.case._Outcome object at 0x7f573e4e6ba0>
test_case = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
subTest = False

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, subTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

.../hostedtoolcache/Python/3.13.9........./x64/lib/python3.13/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
result = <TestCaseFunction test_radius_bind_success>

    def run(self, result=None):
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            stopTestRun = getattr(result, 'stopTestRun', None)
            if startTestRun is not None:
                startTestRun()
        else:
            stopTestRun = None
    
        result.startTest(self)
        try:
            testMethod = getattr(self, self._testMethodName)
            if (getattr(self.__class__, "__unittest_skip__", False) or
                getattr(testMethod, "__unittest_skip__", False)):
                # If the class or method was skipped.
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                _addSkip(result, self, skip_why)
                return result
    
            expecting_failure = (
                getattr(self, "__unittest_expecting_failure__", False) or
                getattr(testMethod, "__unittest_expecting_failure__", False)
            )
            outcome = _Outcome(result)
            start_time = time.perf_counter()
            try:
                self._outcome = outcome
    
                with outcome.testPartExecutor(self):
                    self._callSetUp()
                if outcome.success:
                    outcome.expecting_failure = expecting_failure
                    with outcome.testPartExecutor(self):
>                       self._callTestMethod(testMethod)

.../hostedtoolcache/Python/3.13.9........./x64/lib/python3.13/unittest/case.py:651: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
method = <bound method TestProviderRadius.test_radius_bind_success of <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>>

    def _callTestMethod(self, method):
>       if method() is not None:

.../hostedtoolcache/Python/3.13.9........./x64/lib/python3.13/unittest/case.py:606: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:337: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
                raise exc
            logger.debug("Retrying on error", exc=exc, test=self)
            self.tearDown()
            self._post_teardown()
            self._pre_setup()
            self.setUp()
>           return wrapper(self, *args, **kwargs)

tests/e2e/utils.py:337: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
            return func(self, *args, **kwargs)
    
        except tuple(exceptions) as exc:
            count += 1
            if count > max_retires:
                logger.debug("Exceeded retry count", exc=exc, test=self)
    
>               raise exc

tests/e2e/utils.py:331: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>
args = (), kwargs = {}

    @wraps(func)
    def wrapper(self: TransactionTestCase, *args, **kwargs):
        """Run test again if we're below max_retries, including tearDown and
        setUp. Otherwise raise the error"""
        nonlocal count
        try:
>           return func(self, *args, **kwargs)

tests/e2e/utils.py:324: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>,)
kwargs = {}, file = 'default/flow-default-invalidation-flow.yaml'
content = 'version: 1\nmetadata:\n  name: Default - Invalidation flow\nentries:\n- attrs:\n    designation: invalidation\n    na...0\n    stage: !KeyOf default-invalidation-logout\n    target: !KeyOf flow\n  model: authentik_flows.flowstagebinding\n'

    @wraps(func)
    def wrapper(*args, **kwargs):
        for file in files:
            content = BlueprintInstance(path=file).retrieve()
            Importer.from_string(content).apply()
>       return func(*args, **kwargs)

.../blueprints/tests/__init__.py:25: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tests.e2e.test_provider_radius.TestProviderRadius testMethod=test_radius_bind_success>

    @retry(exceptions=[Timeout])
    @apply_blueprint(
        "default/flow-default-authentication-flow.yaml",
        "default/flow-default-invalidation-flow.yaml",
    )
    def test_radius_bind_success(self):
        """Test simple bind"""
        self._prepare()
        srv = Client(
            server="localhost",
            secret=self.shared_secret.encode(),
            dict=Dictionary(".../radius/dictionaries/dictionary"),
        )
    
        req = srv.CreateAuthPacket(
            code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
        )
        req["User-Password"] = req.PwCrypt(self.user.username)
    
>       reply = srv.SendPacket(req)

tests/e2e/test_provider_radius.py:76: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573cf34e10>
pkt = AuthPacket({'User-Name': ['k6H00JXyfI76ctsI1m9H'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'\x15w\xd7\x8dqu!`\xfa\x84~\xe4\xb9\xe2n\x10\xb3\xfa\x875\xaf\n=\xa2\x8dVy0\xe2\xa2V\xb5']})

    def SendPacket(self, pkt):
        """Send a packet to a RADIUS server.
    
        :param pkt: the packet to send
        :type pkt:  pyrad.packet.Packet
        :return:    the reply packet received
        :rtype:     pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        if isinstance(pkt, packet.AuthPacket):
            if pkt.auth_type == 'eap-md5':
                # Creating EAP-Identity
                password = pkt[2][0] if 2 in pkt else pkt[1][0]
                pkt[79] = [struct.pack('!BBHB%ds' % len(password),
                                       EAP_CODE_RESPONSE,
                                       packet.CurrentID,
                                       len(password) + 5,
                                       EAP_TYPE_IDENTITY,
                                       password)]
>           reply = self._SendPacket(pkt, self.authport)

.venv/lib/python3.13................../site-packages/pyrad/client.py:194: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyrad.client.Client object at 0x7f573cf34e10>
pkt = AuthPacket({'User-Name': ['k6H00JXyfI76ctsI1m9H'], 'NAS-Identifier': ['localhost'], 'User-Password': [b'\x15w\xd7\x8dqu!`\xfa\x84~\xe4\xb9\xe2n\x10\xb3\xfa\x875\xaf\n=\xa2\x8dVy0\xe2\xa2V\xb5']})
port = 1812

    def _SendPacket(self, pkt, port):
        """Send a packet to a RADIUS server.
    
        :param pkt:  the packet to send
        :type pkt:   pyrad.packet.Packet
        :param port: UDP port to send packet to
        :type port:  integer
        :return:     the reply packet received
        :rtype:      pyrad.packet.Packet
        :raise Timeout: RADIUS server does not reply
        """
        self._SocketOpen()
    
        for attempt in range(self.retries):
            if attempt and pkt.code == packet.AccountingRequest:
                if "Acct-Delay-Time" in pkt:
                    pkt["Acct-Delay-Time"] = \
                            pkt["Acct-Delay-Time"][0] + self.timeout
                else:
                    pkt["Acct-Delay-Time"] = self.timeout
    
            now = time.time()
            waitto = now + self.timeout
    
            self._socket.sendto(pkt.RequestPacket(), (self.server, port))
    
            while now < waitto:
                ready = self._poll.poll((waitto - now) * 1000)
    
                if ready:
                    rawreply = self._socket.recv(4096)
                else:
                    now = time.time()
                    continue
    
                try:
                    reply = pkt.CreateReply(packet=rawreply)
                    if pkt.VerifyReply(reply, rawreply):
                        return reply
                except packet.PacketError:
                    pass
    
                now = time.time()
    
>       raise Timeout
E       pyrad.client.Timeout

.venv/lib/python3.13................../site-packages/pyrad/client.py:173: Timeout

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:frontend Features or issues related to the browser, TypeScript, Node.js, etc

Projects

Status: Needs review

Development

Successfully merging this pull request may close these issues.

3 participants