Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(wip): Add typing #81

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions src/flask_principal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"""

from __future__ import with_statement
from typing import Optional
from typing import Callable
import typing_extensions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests are failing because they cannot find the typing_extensions library. It does not appear that the library was added to the dependencies. Also, look at what value this additional dependency adds.


__version__ = '0.4.0'

Expand All @@ -21,8 +24,9 @@

from collections import namedtuple

from flask import g, session, current_app, abort, request
from flask import g, session, current_app, abort, request, Response
from flask.signals import Namespace
import werkzeug

PY3 = sys.version_info[0] == 3

Expand Down Expand Up @@ -139,19 +143,19 @@ class Identity(object):
Needs that are provided by this identity should be added to the `provides`
set after loading.
"""
def __init__(self, id, auth_type=None):
def __init__(self, id: Optional[type[str | int]], auth_type: Optional[type[str | None]]=None) -> None:
self.id = id
self.auth_type = auth_type
self.provides = set()
self.provides: set[Need] = set()

def can(self, permission):
def can(self, permission: Permission) -> bool:
"""Whether the identity has access to the permission.

:param permission: The permission to test provision for.
"""
return permission.allows(self)

def __repr__(self):
def __repr__(self) -> str:
return '<{0} id="{1}" auth_type="{2}" provides={3}>'.format(
self.__class__.__name__, self.id, self.auth_type, self.provides
)
Expand All @@ -160,11 +164,11 @@ def __repr__(self):
class AnonymousIdentity(Identity):
"""An anonymous identity"""

def __init__(self):
def __init__(self) -> None:
Identity.__init__(self, None)


class IdentityContext(object):
class IdentityContext:
"""The context of an identity for a permission.

.. note:: The principal is usually created by the flask_principal.Permission.require method
Expand All @@ -175,19 +179,19 @@ class IdentityContext(object):
flow is continued (context manager) or the function is executed (decorator).
"""

def __init__(self, permission, http_exception=None):
def __init__(self, permission: Permission, http_exception: Optional[int | werkzeug.Response]=None) -> None:
self.permission = permission
self.http_exception = http_exception
"""The permission of this principal
"""

@property
def identity(self):
def identity(self) -> Identity:
"""The identity of this principal
"""
return g.identity

def can(self):
def can(self) -> bool:
"""Whether the identity has access to the permission
"""
return self.identity.can(self.permission)
Expand All @@ -200,38 +204,38 @@ def _decorated(*args, **kw):
return rv
return _decorated

def __enter__(self):
def __enter__(self) -> None:
# check the permission here
if not self.can():
if self.http_exception:
abort(self.http_exception, self.permission)
raise PermissionDenied(self.permission)

def __exit__(self, *args):
def __exit__(self, *args) -> typing_extensions.Literal[False]:
return False


class Permission(object):
class Permission:
"""Represents needs, any of which must be present to access a resource

:param needs: The needs for this permission
"""
def __init__(self, *needs):
def __init__(self, *needs: set[Need]) -> None:
"""A set of needs, any of which must be present in an identity to have
access.
"""

self.perms = {n: True for n in needs}

def _bool(self):
def _bool(self) -> bool:
return bool(self.can())

def __nonzero__(self):
def __nonzero__(self) -> bool:
"""Equivalent to ``self.can()``.
"""
return self._bool()

def __bool__(self):
def __bool__(self) -> bool:
"""Equivalent to ``self.can()``.
"""
return self._bool()
Expand Down Expand Up @@ -341,7 +345,7 @@ def issubset(self, other):
self.excludes.issubset(other.excludes)
)

def allows(self, identity):
def allows(self, identity: Identity) -> bool:
"""Whether the identity can access this permission.

:param identity: The identity
Expand All @@ -368,7 +372,7 @@ class Denial(Permission):
Shortcut class for passing excluded needs.
"""

def __init__(self, *excludes):
def __init__(self, *excludes) -> None:
self.perms = {e: False for e in excludes}


Expand All @@ -379,7 +383,7 @@ def session_identity_loader():
return identity


def session_identity_saver(identity):
def session_identity_saver(identity) -> None:
session['identity.id'] = identity.id
session['identity.auth_type'] = identity.auth_type
session.modified = True
Expand All @@ -393,7 +397,7 @@ class Principal(object):
identification.
:param skip_static: Whether to ignore static endpoints.
"""
def __init__(self, app=None, use_sessions=True, skip_static=False):
def __init__(self, app=None, use_sessions=True, skip_static=False) -> None:
self.identity_loaders = deque()
self.identity_savers = deque()
# XXX This will probably vanish for a better API
Expand All @@ -403,15 +407,15 @@ def __init__(self, app=None, use_sessions=True, skip_static=False):
if app is not None:
self.init_app(app)

def _init_app(self, app):
def _init_app(self, app) -> None:
from warnings import warn
warn(DeprecationWarning(
'_init_app is deprecated, use the new init_app '
'method instead.'), stacklevel=1
)
self.init_app(app)

def init_app(self, app):
def init_app(self, app) -> None:
if hasattr(app, 'static_url_path'):
self._static_path = app.static_url_path
else:
Expand All @@ -424,7 +428,7 @@ def init_app(self, app):
self.identity_loader(session_identity_loader)
self.identity_saver(session_identity_saver)

def set_identity(self, identity):
def set_identity(self, identity) -> None:
"""Set the current identity.

:param identity: The identity to set
Expand Down Expand Up @@ -472,18 +476,18 @@ def save_identity_to_weird_usecase(identity):
self.identity_savers.appendleft(f)
return f

def _set_thread_identity(self, identity):
def _set_thread_identity(self, identity) -> None:
g.identity = identity
identity_loaded.send(current_app._get_current_object(),
identity=identity)

def _on_identity_changed(self, app, identity):
def _on_identity_changed(self, app, identity) -> None:
if self._is_static_route():
return

self.set_identity(identity)

def _on_before_request(self):
def _on_before_request(self) -> None:
if self._is_static_route():
return

Expand Down
Loading