Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Organize into multiple files #2

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 2 additions & 164 deletions ltiauthenticator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,164 +1,2 @@
import time

from traitlets import Bool, Dict
from tornado import gen, web

from jupyterhub.auth import Authenticator
from jupyterhub.handlers import BaseHandler
from jupyterhub.utils import url_path_join

from oauthlib.oauth1.rfc5849 import signature
from collections import OrderedDict

class LTILaunchValidator:
# Record time when process starts, so we can reject requests made
# before this
PROCESS_START_TIME = int(time.time())

# Keep a class-wide, global list of nonces so we can detect & reject
# replay attacks. This possibly makes this non-threadsafe, however.
nonces = OrderedDict()

def __init__(self, consumers):
self.consumers = consumers

def validate_launch_request(
self,
launch_url,
headers,
args
):
"""
Validate a given launch request

launch_url: Full URL that the launch request was POSTed to
headers: k/v pair of HTTP headers coming in with the POST
args: dictionary of body arguments passed to the launch_url
Must have the following keys to be valid:
oauth_consumer_key, oauth_timestamp, oauth_nonce,
oauth_signature
"""

# Validate args!
if 'oauth_consumer_key' not in args:
raise web.HTTPError(401, "oauth_consumer_key missing")
if args['oauth_consumer_key'] not in self.consumers:
raise web.HTTPError(401, "oauth_consumer_key not known")

if 'oauth_signature' not in args:
raise web.HTTPError(401, "oauth_signature missing")
if 'oauth_timestamp' not in args:
raise web.HTTPError(401, 'oauth_timestamp missing')

# Allow 30s clock skew between LTI Consumer and Provider
# Also don't accept timestamps from before our process started, since that could be
# a replay attack - we won't have nonce lists from back then. This would allow users
# who can control / know when our process restarts to trivially do replay attacks.
oauth_timestamp = int(float(args['oauth_timestamp']))
if (
int(time.time()) - oauth_timestamp > 30
or oauth_timestamp < LTILaunchValidator.PROCESS_START_TIME
):
raise web.HTTPError(401, "oauth_timestamp too old")

if 'oauth_nonce' not in args:
raise web.HTTPError(401, 'oauth_nonce missing')
if (
oauth_timestamp in LTILaunchValidator.nonces
and args['oauth_nonce'] in LTILaunchValidator.nonces[oauth_timestamp]
):
raise web.HTTPError(401, "oauth_nonce + oauth_timestamp already used")
LTILaunchValidator.nonces.setdefault(oauth_timestamp, set()).add(args['oauth_nonce'])


args_list = []
for key, values in args.items():
if type(values) is list:
args_list += [(key, value) for value in values]
else:
args_list.append((key, values))

base_string = signature.construct_base_string(
'POST',
signature.normalize_base_string_uri(launch_url),
signature.normalize_parameters(
signature.collect_parameters(body=args_list, headers=headers)
)
)

consumer_secret = self.consumers[args['oauth_consumer_key']]

sign = signature.sign_hmac_sha1(base_string, consumer_secret, None)
is_valid = signature.safe_string_equals(sign, args['oauth_signature'])

if not is_valid:
raise web.HTTPError(401, "Invalid oauth_signature")

return True


class LTIAuthenticator(Authenticator):
"""
JupyterHub Authenticator for use with LTI based services (EdX, Canvas, etc)
"""

auto_login = True
login_service = 'LTI'

consumers = Dict(
{},
config=True,
help="""
A dict of consumer keys mapped to consumer secrets for those keys.

Allows multiple consumers to securely send users to this JupyterHub
instance.
"""
)

def get_handlers(self, app):
return [
('/lti/launch', LTIAuthenticateHandler)
]


@gen.coroutine
def authenticate(self, handler, data=None):
# FIXME: Run a process that cleans up old nonces every other minute
validator = LTILaunchValidator(self.consumers)

args = {}
for k, values in handler.request.body_arguments.items():
args[k] = values[0].decode() if len(values) == 1 else [v.decode() for v in values]


if validator.validate_launch_request(
handler.request.full_url(),
handler.request.headers,
args
):
return {
'name': handler.get_body_argument('user_id'),
'auth_state': {k: v for k, v in args.items() if not k.startswith('oauth_')}
}


def login_url(self, base_url):
return url_path_join(base_url, '/lti/launch')


class LTIAuthenticateHandler(BaseHandler):
"""
Handler for /lti/launch

Implements v1 of the LTI protocol for passing authentication information
through.

If there's a custom parameter called 'next', will redirect user to
that URL after authentication. Else, will send them to /home.
"""

@gen.coroutine
def post(self):
user = yield self.login_user()
self.redirect(self.get_body_argument('custom_next', self.get_next_url()))
from .auth import LTIAuthenticator
from .validator import LTILaunchValidator, LTILaunchValidationError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you omit the from .auth import LTIAuthenticator here in __init__, instead requiring users to explicitly import litauthenticator.validator or ltiauthenticator.auth, then the validator may be used without jupyterhub being importable.

That's a backward-incompatible change in terms of imports, but a minor one.

77 changes: 77 additions & 0 deletions ltiauthenticator/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from traitlets import Bool, Dict
from tornado import gen, web

from jupyterhub.auth import Authenticator
from jupyterhub.handlers import BaseHandler
from jupyterhub.utils import url_path_join

from .validator import LTILaunchValidator, LTILaunchValidationError

class LTIAuthenticator(Authenticator):
"""
JupyterHub Authenticator for use with LTI based services (EdX, Canvas, etc)
"""

auto_login = True
login_service = 'LTI'

consumers = Dict(
{},
config=True,
help="""
A dict of consumer keys mapped to consumer secrets for those keys.

Allows multiple consumers to securely send users to this JupyterHub
instance.
"""
)

def get_handlers(self, app):
return [
('/lti/launch', LTIAuthenticateHandler)
]


@gen.coroutine
def authenticate(self, handler, data=None):
# FIXME: Run a process that cleans up old nonces every other minute
validator = LTILaunchValidator(self.consumers)

args = {}
for k, values in handler.request.body_arguments.items():
args[k] = values[0].decode() if len(values) == 1 else [v.decode() for v in values]


try:
if validator.validate_launch_request(
handler.request.full_url(),
handler.request.headers,
args
):
return {
'name': handler.get_body_argument('user_id'),
'auth_state': {k: v for k, v in args.items() if not k.startswith('oauth_')}
}
except LTILaunchValidationError as e:
raise web.HTTPError(401, e.message)


def login_url(self, base_url):
return url_path_join(base_url, '/lti/launch')


class LTIAuthenticateHandler(BaseHandler):
"""
Handler for /lti/launch

Implements v1 of the LTI protocol for passing authentication information
through.

If there's a custom parameter called 'next', will redirect user to
that URL after authentication. Else, will send them to /home.
"""

@gen.coroutine
def post(self):
user = yield self.login_user()
self.redirect(self.get_body_argument('custom_next', self.get_next_url()))
96 changes: 96 additions & 0 deletions ltiauthenticator/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import time

from oauthlib.oauth1.rfc5849 import signature
from collections import OrderedDict


class LTILaunchValidationError(Exception):
def __init__(self, message):
self.message = message


class LTILaunchValidator:
# Record time when process starts, so we can reject requests made
# before this
PROCESS_START_TIME = int(time.time())

# Keep a class-wide, global list of nonces so we can detect & reject
# replay attacks. This possibly makes this non-threadsafe, however.
nonces = OrderedDict()

def __init__(self, consumers):
self.consumers = consumers

def validate_launch_request(
self,
launch_url,
headers,
args
):
"""
Validate a given launch request

launch_url: Full URL that the launch request was POSTed to
headers: k/v pair of HTTP headers coming in with the POST
args: dictionary of body arguments passed to the launch_url
Must have the following keys to be valid:
oauth_consumer_key, oauth_timestamp, oauth_nonce,
oauth_signature
"""

# Validate args!
if 'oauth_consumer_key' not in args:
raise LTILaunchValidationError("oauth_consumer_key missing")
if args['oauth_consumer_key'] not in self.consumers:
raise LTILaunchValidationError("oauth_consumer_key not known")

if 'oauth_signature' not in args:
raise LTILaunchValidationError("oauth_signature missing")
if 'oauth_timestamp' not in args:
raise LTILaunchValidationError('oauth_timestamp missing')

# Allow 30s clock skew between LTI Consumer and Provider
# Also don't accept timestamps from before our process started, since that could be
# a replay attack - we won't have nonce lists from back then. This would allow users
# who can control / know when our process restarts to trivially do replay attacks.
oauth_timestamp = int(float(args['oauth_timestamp']))
if (
int(time.time()) - oauth_timestamp > 30
or oauth_timestamp < LTILaunchValidator.PROCESS_START_TIME
):
raise LTILaunchValidationError("oauth_timestamp too old")

if 'oauth_nonce' not in args:
raise LTILaunchValidationError('oauth_nonce missing')
if (
oauth_timestamp in LTILaunchValidator.nonces
and args['oauth_nonce'] in LTILaunchValidator.nonces[oauth_timestamp]
):
raise LTILaunchValidationError("oauth_nonce + oauth_timestamp already used")
LTILaunchValidator.nonces.setdefault(oauth_timestamp, set()).add(args['oauth_nonce'])


args_list = []
for key, values in args.items():
if type(values) is list:
args_list += [(key, value) for value in values]
else:
args_list.append((key, values))

base_string = signature.construct_base_string(
'POST',
signature.normalize_base_string_uri(launch_url),
signature.normalize_parameters(
signature.collect_parameters(body=args_list, headers=headers)
)
)

consumer_secret = self.consumers[args['oauth_consumer_key']]

sign = signature.sign_hmac_sha1(base_string, consumer_secret, None)
is_valid = signature.safe_string_equals(sign, args['oauth_signature'])

if not is_valid:
raise LTILaunchValidationError("Invalid oauth_signature")

return True
Loading