Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor LTI 1.1 validator #44

Merged
merged 2 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 52 additions & 41 deletions ltiauthenticator/lti11/auth.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,84 @@
from tornado import gen

from traitlets import Dict

from jupyterhub.app import JupyterHub
from jupyterhub.auth import Authenticator
from jupyterhub.handlers import BaseHandler
from jupyterhub.utils import url_path_join

from traitlets.config import Dict

from ltiauthenticator.lti11.handlers import LTI11AuthenticateHandler
from ltiauthenticator.lti11.validator import LTI11LaunchValidator
from ltiauthenticator.utils import convert_request_to_dict
from ltiauthenticator.utils import get_client_protocol


class LTI11Authenticator(Authenticator):
"""
JupyterHub Authenticator for use with LTI based services (EdX, Canvas, etc)
JupyterHub LTI 1.1 Authenticator which extends the ltiauthenticator.LTIAuthenticator class.
Messages sent to this authenticator are sent from a tool consumer (TC), such as
an LMS. JupyterHub, as the authenticator, works as the tool provider (TP), also
known as the external tool.

The LTIAuthenticator base class defines the consumers, defined as 1 or (n) consumer key
and shared secret k/v's to verify requests from their tool consumer.
"""

auto_login = True
login_service = "LTI"
login_service = "LTI 1.1"

consumers = Dict(
{},
config=True,
help="""
A dict of consumer keys mapped to consumer secrets for those keys.

Allows multiple consumers to securely send users to this JupyterHub
instance.
""",
)

def get_handlers(self, app):
def get_handlers(self, app: JupyterHub) -> BaseHandler:
return [("/lti/launch", LTI11AuthenticateHandler)]

@gen.coroutine
def authenticate(self, handler, data) -> dict:
# FIXME: Run a process that cleans up old nonces every other minute
def login_url(self, base_url):
return url_path_join(base_url, "/lti/launch")

async def authenticate( # noqa: C901
self, handler: BaseHandler, data: dict = None
) -> dict: # noqa: C901
"""
LTI 1.1 Authenticator. One or more consumer keys/values must be set in the jupyterhub config with the
LTI11Authenticator.consumers dict.

Args:
handler: JupyterHub's Authenticator handler object. For LTI 1.1 requests, the handler is
an instance of LTIAuthenticateHandler.
data: optional data object

Returns:
Authentication dictionary

Raises:
HTTPError if the required values are not in the request
"""
validator = LTI11LaunchValidator(self.consumers)

args = {}
for k, values in handler.request.body_arguments.items():
args[k] = (
values[0].decode() if len(values) == 1 else [v.decode() for v in values]
)

# handle multiple layers of proxied protocol (comma separated) and take the outermost
# value (first from the list)
if "x-forwarded-proto" in handler.request.headers:
# x-forwarded-proto might contain comma delimited values
# left-most value is the one sent by original client
hops = [
h.strip()
for h in handler.request.headers["x-forwarded-proto"].split(",")
]
protocol = hops[0]
else:
protocol = handler.request.protocol

launch_url = protocol + "://" + handler.request.host + handler.request.uri
self.log.debug(
"Original arguments received in request: %s" % handler.request.arguments
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these be sensitive?

Copy link
Collaborator Author

@jgwerner jgwerner Apr 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuvipanda Not that I am aware of. The launch request arguments could contain sensitive information if the integration with the LMS is set to public, in which case the username is fetched from one of the personally identifiable arguments, such as email. When the application is set to private, the user_id is an opaque identifier and not considered personally identifiable information (PII). But in these cases the arguments that would contain PII values are empty. The other arguments are related to context (courses) or assignments/modules (resources), etc. Lastly, there are oauth_* arguments but these shouldn't be that sensitive. We could obfuscate some or all arguments from the request for logging, what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it's debug logging I think it's ok. We already log user_ids elsewhere in JupyterHub.

)

if validator.validate_launch_request(launch_url, handler.request.headers, args):
# Before we return lti_user_id, check to see if a canvas_custom_user_id was sent.
# If so, this indicates two things:
# 1. The request was sent from Canvas, not edX
# 2. The request was sent from a Canvas course not running in anonymous mode
# If this is the case we want to use the canvas ID to allow grade returns through the Canvas API
# If Canvas is running in anonymous mode, we'll still want the 'user_id' (which is the `lti_user_id``)
# extract the request arguments to a dict
args = convert_request_to_dict(handler.request.arguments)
self.log.debug("Decoded args from request: %s" % args)

# get the origin protocol
protocol = get_client_protocol(handler)
self.log.debug("Origin protocol is: %s" % protocol)

# build the full launch url value required for oauth1 signatures
launch_url = f"{protocol}://{handler.request.host}{handler.request.uri}"
self.log.debug("Launch url is: %s" % launch_url)

if validator.validate_launch_request(launch_url, handler.request.headers, args):
# get the lms vendor to implement optional logic for said vendor
canvas_id = handler.get_body_argument("custom_canvas_user_id", default=None)

if canvas_id is not None:
Expand All @@ -78,6 +92,3 @@ def authenticate(self, handler, data) -> dict:
k: v for k, v in args.items() if not k.startswith("oauth_")
},
}

def login_url(self, base_url):
jgwerner marked this conversation as resolved.
Show resolved Hide resolved
return url_path_join(base_url, "/lti/launch")
70 changes: 70 additions & 0 deletions ltiauthenticator/lti11/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# LTI 1.1
# Defined from https://www.imsglobal.org/specs/ltiv1p1p1/implementation-guide
# We define the user_id as required even though it is defined as recommended as it used as
# a fallback id for nbgrader's lms_user_id column.
LTI11_LAUNCH_PARAMS_REQUIRED = [
"lti_message_type",
"lti_version",
"resource_link_id",
"user_id",
]

LTI11_LAUNCH_PARAMS_RECOMMENDED = [
"resource_link_title",
"roles",
"lis_person_name_given",
"lis_person_name_family",
"lis_person_name_full",
"lis_person_contact_email_primary",
"context_id",
"context_title",
"context_label",
"launch_presentation_locale",
"launch_presentation_document_target",
"launch_presentation_width",
"launch_presentation_height",
"launch_presentation_return_url",
"tool_consumer_info_product_family_code",
"tool_consumer_info_version",
"tool_consumer_instance_guid",
"tool_consumer_instance_name",
"tool_consumer_instance_contact_email",
]

LTI11_LAUNCH_PARAMS_OTIONAL = [
"resource_link_description",
"user_image",
"role_scope_mentor",
"context_type",
"launch_presentation_css_url",
"tool_consumer_instance_description",
"tool_consumer_instance_url",
]

LTI11_LIS_OPTION = [
"lis_outcome_service_url",
"lis_result_sourcedid",
"lis_person_sourcedid",
"lis_course_offering_sourcedid",
"lis_course_section_sourcedid",
]

# https://www.imsglobal.org/specs/ltiv1p1/implementation-guide
# Section 4.2
LTI11_OAUTH_ARGS = [
"oauth_consumer_key",
"oauth_signature_method",
"oauth_timestamp",
"oauth_nonce",
"oauth_callback",
"oauth_version",
"oauth_signature",
]

LTI11_LAUNCH_PARAMS_REQUIRED = LTI11_LAUNCH_PARAMS_REQUIRED + LTI11_OAUTH_ARGS

LTI11_LAUNCH_PARAMS_ALL = (
LTI11_LAUNCH_PARAMS_REQUIRED
+ LTI11_LAUNCH_PARAMS_RECOMMENDED
+ LTI11_LAUNCH_PARAMS_OTIONAL
)
161 changes: 102 additions & 59 deletions ltiauthenticator/lti11/validator.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
from collections import OrderedDict
import time

from tornado import web

from oauthlib.oauth1.rfc5849 import signature
from collections import OrderedDict

from tornado.web import HTTPError

from traitlets.config import LoggingConfigurable

from typing import Any
from typing import Dict

from .constants import LTI11_OAUTH_ARGS
from .constants import LTI11_LAUNCH_PARAMS_REQUIRED


class LTI11LaunchValidator(LoggingConfigurable):
"""
This class closely mimics the jupyterhub/ltiauthenticator LTILaunchValidator
base class. Inherits from the LoggingConfigurable traitlet to support logging.

Allows JupyterHub to verify LTI 1.1 compatible requests as a tool
provider (TP).

For an instance of this class to work, you need to set the consumer key and
shared secret key(s)/value(s) in `LTI11Authenticator` settings, which inherits
from the ``ltiauthenticator.LTIAuthenticator`` class. The key/value pairs are
set as are defined as a dict using the ``consumers`` attribute.

class LTI11LaunchValidator:
# Record time when process starts, so we can reject requests made
# before this
PROCESS_START_TIME = int(time.time())
Attributes:
consumers: consumer key and shared secret key/value pair(s)
"""

# Keep a class-wide, global list of nonces so we can detect & reject
# replay attacks. This possibly makes this non-threadsafe, however.
Expand All @@ -18,57 +38,81 @@ class LTI11LaunchValidator:
def __init__(self, consumers):
self.consumers = consumers

def validate_launch_request(self, launch_url, headers, args):
"""
Validate a given launch request

launch_url: Full URL that the launch request was POSTed to
headers: k/v pair of HTTP headers coming in with the POST
args: dictionary of body arguments passed to the launch_url
Must have the following keys to be valid:
oauth_consumer_key, oauth_timestamp, oauth_nonce,
oauth_signature
def validate_launch_request(
self,
launch_url: str,
headers: Dict[str, Any],
args: Dict[str, Any],
) -> bool:
"""
Validate a given LTI 1.1 launch request. The arguments' k/v's are either
required, recommended, or optional. The required/recommended/optional
keys are defined as constants.

# Validate args!
if "oauth_consumer_key" not in args:
raise web.HTTPError(401, "oauth_consumer_key missing")
if args["oauth_consumer_key"] not in self.consumers:
raise web.HTTPError(401, "oauth_consumer_key not known")

if "oauth_signature" not in args:
raise web.HTTPError(401, "oauth_signature missing")
if "oauth_timestamp" not in args:
raise web.HTTPError(401, "oauth_timestamp missing")

# Allow 30s clock skew between LTI Consumer and Provider
# Also don't accept timestamps from before our process started, since that could be
# a replay attack - we won't have nonce lists from back then. This would allow users
# who can control / know when our process restarts to trivially do replay attacks.
oauth_timestamp = int(float(args["oauth_timestamp"]))
if (
int(time.time()) - oauth_timestamp > 30
or oauth_timestamp < LTI11LaunchValidator.PROCESS_START_TIME
):
raise web.HTTPError(401, "oauth_timestamp too old")

if "oauth_nonce" not in args:
raise web.HTTPError(401, "oauth_nonce missing")
if (
oauth_timestamp in LTI11LaunchValidator.nonces
and args["oauth_nonce"] in LTI11LaunchValidator.nonces[oauth_timestamp]
):
raise web.HTTPError(401, "oauth_nonce + oauth_timestamp already used")
LTI11LaunchValidator.nonces.setdefault(oauth_timestamp, set()).add(
args["oauth_nonce"]
)
Args:
launch_url: URL (base_url + path) that receives the launch request,
usually from a tool consumer.
headers: HTTP headers included with the POST request
args: the body sent to the launch url.

args_list = []
for key, values in args.items():
if type(values) is list:
args_list += [(key, value) for value in values]
else:
args_list.append((key, values))
Returns:
True if the validation passes, False otherwise.

Raises:
HTTPError if a required argument is not inclued in the POST request.
"""
# Ensure that required oauth_* body arguments are included in the request
for param in LTI11_OAUTH_ARGS:
if param not in args.keys():
raise HTTPError(
400, "Required oauth arg %s not included in request" % param
)
if not args.get(param):
raise HTTPError(
400, "Required oauth arg %s does not have a value" % param
)

# Ensure that consumer key is registered in in jupyterhub_config.py
# LTI11Authenticator.consumers defined in parent class
if args["oauth_consumer_key"] not in self.consumers:
raise HTTPError(401, "unknown oauth_consumer_key")

# Ensure that required LTI 1.1 body arguments are included in the request
for param in LTI11_LAUNCH_PARAMS_REQUIRED:
if param not in args.keys():
raise HTTPError(
400, "Required LTI 1.1 arg arg %s not included in request" % param
)
if not args.get(param):
raise HTTPError(
400, "Required LTI 1.1 arg %s does not have a value" % param
)

# Inspiration to validate nonces/timestamps from OAuthlib
# https://github.com/oauthlib/oauthlib/blob/master/oauthlib/oauth1/rfc5849/endpoints/base.py#L147
if len(str(int(args["oauth_timestamp"]))) != 10:
raise HTTPError(401, "Invalid timestamp format.")
try:
ts = int(args["oauth_timestamp"])
except ValueError:
raise HTTPError(401, "Timestamp must be an integer.")
else:
# Reject timestamps that are older than 30 seconds
if abs(time.time() - ts) > 30:
raise HTTPError(
401,
"Timestamp given is invalid, differ from "
"allowed by over %s seconds." % str(int(time.time() - ts)),
)
if (
ts in LTI11LaunchValidator.nonces
and args["oauth_nonce"] in LTI11LaunchValidator.nonces[ts]
):
raise HTTPError(401, "oauth_nonce + oauth_timestamp already used")
LTI11LaunchValidator.nonces.setdefault(ts, set()).add(args["oauth_nonce"])

# convert arguments dict back to a list of tuples for signature
args_list = [(k, v) for k, v in args.items()]

base_string = signature.signature_base_string(
"POST",
Expand All @@ -77,13 +121,12 @@ def validate_launch_request(self, launch_url, headers, args):
signature.collect_parameters(body=args_list, headers=headers)
),
)

consumer_secret = self.consumers[args["oauth_consumer_key"]]

sign = signature.sign_hmac_sha1(base_string, consumer_secret, None)
is_valid = signature.safe_string_equals(sign, args["oauth_signature"])

self.log.debug("signature in request: %s" % args["oauth_signature"])
self.log.debug("calculated signature: %s" % sign)
if not is_valid:
raise web.HTTPError(401, "Invalid oauth_signature")
raise HTTPError(401, "Invalid oauth_signature")

return True
Loading