Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #70. Implemented registry pattern for class-based algorithms #71

Merged
merged 9 commits into from
Jan 18, 2015
225 changes: 31 additions & 194 deletions jwt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
http://self-issued.info/docs/draft-jones-json-web-token-01.html
"""

import base64
import binascii
import hashlib
import hmac
from datetime import datetime, timedelta

from calendar import timegm
from collections import Mapping
from datetime import datetime, timedelta

from .compat import (json, string_types, text_type, constant_time_compare,
timedelta_total_seconds)
from jwt.utils import base64url_decode, base64url_encode

from .compat import (json, string_types, text_type, timedelta_total_seconds)


__version__ = '0.4.1'
__all__ = [
# Functions
'encode',
'decode',
'register_algorithm',

# Exceptions
'InvalidTokenError',
Expand All @@ -33,9 +33,25 @@
# Deprecated aliases
'ExpiredSignature',
'InvalidAudience',
'InvalidIssuer',
'InvalidIssuer'
]

_algorithms = {}


def register_algorithm(alg_id, alg_obj):
""" Registers a new Algorithm for use when creating and verifying JWTs """
if alg_id in _algorithms:
raise ValueError('Algorithm already has a handler.')

if not isinstance(alg_obj, Algorithm):
raise TypeError('Object is not of type `Algorithm`')

_algorithms[alg_id] = alg_obj

from jwt.algorithms import Algorithm, _register_default_algorithms # NOQA
_register_default_algorithms()


class InvalidTokenError(Exception):
pass
Expand All @@ -56,187 +72,11 @@ class InvalidAudienceError(InvalidTokenError):
class InvalidIssuerError(InvalidTokenError):
pass


# Compatibility aliases (deprecated)
ExpiredSignature = ExpiredSignatureError
InvalidAudience = InvalidAudienceError
InvalidIssuer = InvalidIssuerError

signing_methods = {
'none': lambda msg, key: b'',
'HS256': lambda msg, key: hmac.new(key, msg, hashlib.sha256).digest(),
'HS384': lambda msg, key: hmac.new(key, msg, hashlib.sha384).digest(),
'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest()
}

verify_methods = {
'HS256': lambda msg, key: hmac.new(key, msg, hashlib.sha256).digest(),
'HS384': lambda msg, key: hmac.new(key, msg, hashlib.sha384).digest(),
'HS512': lambda msg, key: hmac.new(key, msg, hashlib.sha512).digest()
}


def prepare_HS_key(key):
if not isinstance(key, string_types) and not isinstance(key, bytes):
raise TypeError('Expecting a string- or bytes-formatted key.')

if isinstance(key, text_type):
key = key.encode('utf-8')

return key

prepare_key_methods = {
'none': lambda key: None,
'HS256': prepare_HS_key,
'HS384': prepare_HS_key,
'HS512': prepare_HS_key
}

try:
from cryptography.hazmat.primitives import interfaces, hashes
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key, load_pem_public_key, load_ssh_public_key
)
from cryptography.hazmat.primitives.asymmetric import ec, padding
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidSignature

def sign_rsa(msg, key, hashalg):
signer = key.signer(
padding.PKCS1v15(),
hashalg
)

signer.update(msg)
return signer.finalize()

def verify_rsa(msg, key, hashalg, sig):
verifier = key.verifier(
sig,
padding.PKCS1v15(),
hashalg
)

verifier.update(msg)

try:
verifier.verify()
return True
except InvalidSignature:
return False

signing_methods.update({
'RS256': lambda msg, key: sign_rsa(msg, key, hashes.SHA256()),
'RS384': lambda msg, key: sign_rsa(msg, key, hashes.SHA384()),
'RS512': lambda msg, key: sign_rsa(msg, key, hashes.SHA512())
})

verify_methods.update({
'RS256': lambda msg, key, sig: verify_rsa(msg, key, hashes.SHA256(), sig),
'RS384': lambda msg, key, sig: verify_rsa(msg, key, hashes.SHA384(), sig),
'RS512': lambda msg, key, sig: verify_rsa(msg, key, hashes.SHA512(), sig)
})

def prepare_RS_key(key):
if isinstance(key, interfaces.RSAPrivateKey) or \
isinstance(key, interfaces.RSAPublicKey):
return key

if isinstance(key, string_types):
if isinstance(key, text_type):
key = key.encode('utf-8')

try:
if key.startswith(b'ssh-rsa'):
key = load_ssh_public_key(key, backend=default_backend())
else:
key = load_pem_private_key(key, password=None, backend=default_backend())
except ValueError:
key = load_pem_public_key(key, backend=default_backend())
else:
raise TypeError('Expecting a PEM-formatted key.')

return key

prepare_key_methods.update({
'RS256': prepare_RS_key,
'RS384': prepare_RS_key,
'RS512': prepare_RS_key
})

def sign_ecdsa(msg, key, hashalg):
signer = key.signer(ec.ECDSA(hashalg))

signer.update(msg)
return signer.finalize()

def verify_ecdsa(msg, key, hashalg, sig):
verifier = key.verifier(sig, ec.ECDSA(hashalg))

verifier.update(msg)

try:
verifier.verify()
return True
except InvalidSignature:
return False

signing_methods.update({
'ES256': lambda msg, key: sign_ecdsa(msg, key, hashes.SHA256()),
'ES384': lambda msg, key: sign_ecdsa(msg, key, hashes.SHA384()),
'ES512': lambda msg, key: sign_ecdsa(msg, key, hashes.SHA512()),
})

verify_methods.update({
'ES256': lambda msg, key, sig: verify_ecdsa(msg, key, hashes.SHA256(), sig),
'ES384': lambda msg, key, sig: verify_ecdsa(msg, key, hashes.SHA384(), sig),
'ES512': lambda msg, key, sig: verify_ecdsa(msg, key, hashes.SHA512(), sig),
})

def prepare_ES_key(key):
if isinstance(key, interfaces.EllipticCurvePrivateKey) or \
isinstance(key, interfaces.EllipticCurvePublicKey):
return key

if isinstance(key, string_types):
if isinstance(key, text_type):
key = key.encode('utf-8')

# Attempt to load key. We don't know if it's
# a Signing Key or a Verifying Key, so we try
# the Verifying Key first.
try:
key = load_pem_public_key(key, backend=default_backend())
except ValueError:
key = load_pem_private_key(key, password=None, backend=default_backend())

else:
raise TypeError('Expecting a PEM-formatted key.')

return key

prepare_key_methods.update({
'ES256': prepare_ES_key,
'ES384': prepare_ES_key,
'ES512': prepare_ES_key
})

except ImportError:
pass


def base64url_decode(input):
rem = len(input) % 4

if rem > 0:
input += b'=' * (4 - rem)

return base64.urlsafe_b64decode(input)


def base64url_encode(input):
return base64.urlsafe_b64encode(input).replace(b'=', b'')


def header(jwt):
if isinstance(jwt, text_type):
Expand Down Expand Up @@ -290,8 +130,10 @@ def encode(payload, key, algorithm='HS256', headers=None, json_encoder=None):
# Segments
signing_input = b'.'.join(segments)
try:
key = prepare_key_methods[algorithm](key)
signature = signing_methods[algorithm](signing_input, key)
alg_obj = _algorithms[algorithm]
key = alg_obj.prepare_key(key)
signature = alg_obj.sign(signing_input, key)

except KeyError:
raise NotImplementedError('Algorithm not supported')

Expand Down Expand Up @@ -360,17 +202,12 @@ def verify_signature(payload, signing_input, header, signature, key='',
raise TypeError('audience must be a string or None')

try:
algorithm = header['alg'].upper()
key = prepare_key_methods[algorithm](key)
alg_obj = _algorithms[header['alg'].upper()]
key = alg_obj.prepare_key(key)

if algorithm.startswith('HS'):
expected = verify_methods[algorithm](signing_input, key)
if not alg_obj.verify(signing_input, key, signature):
raise DecodeError('Signature verification failed')

if not constant_time_compare(signature, expected):
raise DecodeError('Signature verification failed')
else:
if not verify_methods[algorithm](signing_input, key, signature):
raise DecodeError('Signature verification failed')
except KeyError:
raise DecodeError('Algorithm not supported')

Expand Down
Loading