-
Notifications
You must be signed in to change notification settings - Fork 16
/
decoder.py
313 lines (257 loc) · 12.8 KB
/
decoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""
JWT decoder utility functions.
In most of this module, "decode" refers to both verifying and unpacking a JWT,
as a unified operation. (Reading the contents of an unverified JWT would be
a security risk in the general case.)
"""
import logging
import sys
import jwt
from django.conf import settings
from edx_django_utils.monitoring import set_custom_attribute
from jwkest.jwk import KEYS
from jwkest.jws import JWS
from rest_framework_jwt.settings import api_settings
from semantic_version import Version
from edx_rest_framework_extensions.settings import get_first_jwt_issuer, get_jwt_issuers
logger = logging.getLogger(__name__)
class JwtTokenVersion:
default_latest_supported = '1.2.0'
starting_version = '1.0.0'
added_version = '1.1.0'
def jwt_decode_handler(token, decode_symmetric_token=True):
"""
Decodes (and verifies) a JSON Web Token (JWT).
Notes:
* Requires "exp" and "iat" claims to be present in the token's payload.
* Aids debugging by logging InvalidTokenError log entries when decoding fails.
* Setting for JWT_DECODE_HANDLER expects a single argument, token. The argument decode_symmetric_token
is for internal use only.
Examples:
Use with `djangorestframework-jwt <https://getblimp.github.io/django-rest-framework-jwt/>`_, by changing
your Django settings:
.. code-block:: python
JWT_AUTH = {
'JWT_DECODE_HANDLER': 'edx_rest_framework_extensions.auth.jwt.decoder.jwt_decode_handler',
'JWT_ISSUER': 'https://the.jwt.issuer',
'JWT_SECRET_KEY': 'the-jwt-secret-key', (defaults to settings.SECRET_KEY)
'JWT_AUDIENCE': 'the-jwt-audience',
'JWT_PUBLIC_SIGNING_JWK_SET': 'the-jwk-set-of-public-signing-keys',
}
Warning:
Do **not** use this method internally. Only use it in ``JWT_DECODE_HANDLER`` like the above example.
Internally, use ``configured_jwt_decode_handler`` which respects the ``JWT_DECODE_HANDLER`` setting.
Args:
token (str): JWT to be decoded.
decode_symmetric_token (bool): Whether to decode symmetric tokens or not. Pass False for asymmetric tokens only
Returns:
dict: Decoded JWT payload.
Raises:
MissingRequiredClaimError: Either the exp or iat claims is missing from the JWT payload.
InvalidTokenError: Decoding fails.
"""
jwt_issuer = get_first_jwt_issuer()
_verify_jwt_signature(token, jwt_issuer, decode_symmetric_token=decode_symmetric_token)
decoded_token = _decode_and_verify_token(token, jwt_issuer)
return _set_token_defaults(decoded_token)
def configured_jwt_decode_handler(token):
"""
Calls the ``jwt_decode_handler`` configured in the ``JWT_DECODE_HANDLER`` setting.
"""
api_setting_jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
return api_setting_jwt_decode_handler(token)
def get_asymmetric_only_jwt_decode_handler(token):
"""
Returns a jwt_decode_handler that will only validate asymmetrically signed JWTs.
WARNING: This will only work with a service that is configured to use the
jwt_decode_handler from this library. This can be used to decode an
already decoded JWT, to ensure it is asymmetrically signed. This check
can go away once the DEPR for symmetrically signed JWTs is complete:
https://github.com/openedx/public-engineering/issues/83
"""
return jwt_decode_handler(token, decode_symmetric_token=False)
def decode_jwt_scopes(token):
"""
Decode the JWT and return the scopes claim.
"""
return configured_jwt_decode_handler(token).get('scopes', [])
def decode_jwt_is_restricted(token):
"""
Decode the JWT and return the is_restricted claim.
"""
return configured_jwt_decode_handler(token).get('is_restricted', False)
def decode_jwt_filters(token):
"""
Decode the JWT, parse the filters claim, and return a
list of (filter_type, filter_value) tuples.
"""
return [
jwt_filter.split(':')
for jwt_filter in configured_jwt_decode_handler(token).get('filters', [])
]
def _set_token_defaults(token):
"""
Returns an updated token that includes default values for
fields that were introduced since the token was created
by checking its version number.
"""
def _verify_version(jwt_version):
supported_version = Version(
settings.JWT_AUTH.get('JWT_SUPPORTED_VERSION', JwtTokenVersion.default_latest_supported)
)
if jwt_version.major > supported_version.major:
logger.info('Token decode failed due to unsupported JWT version number [%s]', str(jwt_version))
raise jwt.InvalidTokenError('JWT version number [%s] is unsupported' % str(jwt_version))
def _get_and_set_version(token):
"""
Tokens didn't always contain a version number so we
default to a nominal starting number.
"""
if 'version' not in token:
token['version'] = str(JwtTokenVersion.starting_version)
return Version(token['version'])
def _set_is_restricted(token):
"""
We can safely default to False since all "restricted" tokens
created prior to the addition of the `is_restricted` flag were
always created as expired tokens. Expired tokens would not
validate and so would not get this far into the decoding process.
# TODO: ARCH-166
"""
if 'is_restricted' not in token:
token['is_restricted'] = False
def _set_filters(token):
"""
We can safely default to an empty list of filters since
previously created tokens were either "restricted" (always
expired) or had full access.
# TODO: ARCH-166
"""
if 'filters' not in token:
token['filters'] = []
token_version = _get_and_set_version(token)
_verify_version(token_version)
_set_is_restricted(token)
_set_filters(token)
return token
def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token):
"""
Verifies the JWT signature. Raises InvalidTokenError in the event of an error.
Arguments:
token (str): JWT to be decoded.
jwt_issuer (dict): A dict of JWT issuer related settings, containing the symmetric key.
decode_symmetric_token (bool): Whether to decode symmetric tokens or not. Pass False for asymmetric tokens only
"""
# .. custom_attribute_name: jwt_auth_check_symmetric_key
# .. custom_attribute_description: True if symmetric keys will also be used for checking
# the JWT signature, and False if only asymmetric keys will be used.
set_custom_attribute('jwt_auth_check_symmetric_key', decode_symmetric_token)
# For observability purposes, we will first try asymmetric keys only to verify
# that we no longer need the symmetric key. However, if this fails, we will
# continue on to the original code path and try all keys (including symmetric)
# and add monitoring to let us know. This is meant to be temporary, until we
# can fully retire code paths for symmetric keys, as part of
# DEPR: Symmetric JWTs: https://github.com/openedx/public-engineering/issues/83
# Use add_symmetric_keys=False to only include asymmetric keys at first
key_set = _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=False)
# .. custom_attribute_name: jwt_auth_verify_asymmetric_keys_count
# .. custom_attribute_description: Number of JWT verification keys in use for this
# verification. Should be same as number of asymmetric public keys. This is
# intended to aid in key rotations; once the average count stabilizes at a
# higher number after adding a public key, it should be safe to change the secret key.
set_custom_attribute('jwt_auth_verify_asymmetric_keys_count', len(key_set))
try:
_ = JWS().verify_compact(token, key_set)
# .. custom_attribute_name: jwt_auth_asymmetric_verified
# .. custom_attribute_description: Whether the JWT was successfully verified
# using an asymmetric key.
set_custom_attribute('jwt_auth_asymmetric_verified', True)
return
except Exception: # pylint: disable=broad-except
# Continue to the old code path of trying all keys
pass
# The following is the original code that includes both the symmetric and asymmetric keys
# as requested with the decode_symmetric_token argument. Note that the check against
# the asymmetric keys here is redundant and unnecessary, but this code is temporary and
# will be simplified once symmetric keys have been fully retired.
key_set = _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=decode_symmetric_token)
# .. custom_attribute_name: jwt_auth_verify_all_keys_count
# .. custom_attribute_description: Number of JWT verification keys in use for this
# verification. Should be same as number of asymmetric public keys, plus one if
# a symmetric key secret is set. This is intended to aid in key rotations; once
# the average count stabilizes at a higher number after adding a public key, it
# should be safe to change the secret key.
set_custom_attribute('jwt_auth_verify_all_keys_count', len(key_set))
try:
_ = JWS().verify_compact(token, key_set)
# .. custom_attribute_name: jwt_auth_symmetric_verified
# .. custom_attribute_description: Whether the JWT was successfully verified
# using a symmetric key.
# Note: Rather than using a single custom attribute like ``jwt_auth_verified``
# with values of 'symmetric' or 'asymmetric', we use two separate custom
# attribute names (e.g. jwt_auth_symmetric_verified and jwt_auth_asymmetric_verified),
# so that if each of these were set separately in the same request, they
# wouldn't clobber each other.
set_custom_attribute('jwt_auth_symmetric_verified', True)
return
except Exception as token_error:
# .. custom_attribute_name: jwt_auth_verification_failed
# .. custom_attribute_description: True if the JWT token verification failed.
set_custom_attribute('jwt_auth_verification_failed', True)
logger.exception('Token verification failed.')
exc_info = sys.exc_info()
raise jwt.InvalidTokenError(exc_info[2]) from token_error
def _decode_and_verify_token(token, jwt_issuer):
"""
Part of the verification implementation; must not be used in isolation,
as the signature is actually checked in a different function.
"""
options = {
'require': ["exp", "iat"],
'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
'verify_aud': settings.JWT_AUTH.get('JWT_VERIFY_AUDIENCE', True),
# See https://github.com/openedx/edx-drf-extensions/issues/327 for removing manual issuer verification.
'verify_iss': False, # Verified manually below
'verify_signature': False, # Verified with JWS already
}
decoded_token = jwt.decode(
token,
jwt_issuer['SECRET_KEY'],
options=options,
leeway=api_settings.JWT_LEEWAY,
audience=jwt_issuer['AUDIENCE'],
issuer=jwt_issuer['ISSUER'],
algorithms=[api_settings.JWT_ALGORITHM],
)
# See https://github.com/openedx/edx-drf-extensions/issues/327 for removing this manual issuer validation.
token_issuer = decoded_token.get('iss')
# .. custom_attribute_name: jwt_auth_issuer
# .. custom_attribute_description: Value set to the JWT auth issuer.
set_custom_attribute('jwt_auth_issuer', token_issuer)
issuer_matched = any(issuer['ISSUER'] == token_issuer for issuer in get_jwt_issuers())
if token_issuer == jwt_issuer['ISSUER']:
# .. custom_attribute_name: jwt_auth_issuer_verification
# .. custom_attribute_description: Depending on issuer verification, the value will
# be one of: matches-first-issuer, matches-later-issuer, or no-match.
set_custom_attribute('jwt_auth_issuer_verification', 'matches-first-issuer')
elif issuer_matched:
set_custom_attribute('jwt_auth_issuer_verification', 'matches-later-issuer')
else:
set_custom_attribute('jwt_auth_issuer_verification', 'no-match')
logger.info('Token decode failed due to mismatched issuer [%s]', token_issuer)
raise jwt.InvalidTokenError('%s is not a valid issuer.' % token_issuer)
return decoded_token
def _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=True):
"""
Returns a JWK Keyset containing all active keys that are configured
for verifying signatures.
"""
key_set = KEYS()
# asymmetric keys
signing_jwk_set = settings.JWT_AUTH.get('JWT_PUBLIC_SIGNING_JWK_SET')
if signing_jwk_set:
key_set.load_jwks(signing_jwk_set)
if add_symmetric_keys:
# symmetric key
key_set.add({'key': jwt_issuer['SECRET_KEY'], 'kty': 'oct'})
return key_set